| /*=============================================================================# |
| # Copyright (c) 2009, 2021 Stephan Wahlbrink and others. |
| # |
| # This program and the accompanying materials are made available under the |
| # terms of the Eclipse Public License 2.0 which is available at |
| # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 |
| # which is available at https://www.apache.org/licenses/LICENSE-2.0. |
| # |
| # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 |
| # |
| # Contributors: |
| # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation |
| #=============================================================================*/ |
| |
| package org.eclipse.statet.rj.servi.pool; |
| |
| import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert; |
| |
| import java.lang.management.ManagementFactory; |
| import java.net.MalformedURLException; |
| import java.net.UnknownHostException; |
| import java.rmi.RemoteException; |
| import java.rmi.registry.LocateRegistry; |
| import java.rmi.registry.Registry; |
| import java.rmi.server.RMIClientSocketFactory; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import javax.management.ObjectName; |
| import javax.management.OperationsException; |
| import javax.rmi.ssl.SslRMIClientSocketFactory; |
| |
| import org.eclipse.statet.jcommons.collections.CopyOnWriteIdentityListSet; |
| import org.eclipse.statet.jcommons.collections.ImCollections; |
| import org.eclipse.statet.jcommons.lang.Disposable; |
| import org.eclipse.statet.jcommons.lang.NonNullByDefault; |
| import org.eclipse.statet.jcommons.lang.Nullable; |
| import org.eclipse.statet.jcommons.lang.ObjectUtils.ToStringBuilder; |
| import org.eclipse.statet.jcommons.rmi.RMIAddress; |
| import org.eclipse.statet.jcommons.rmi.RMIRegistry; |
| import org.eclipse.statet.jcommons.rmi.RMIRegistryManager; |
| import org.eclipse.statet.jcommons.runtime.CommonsRuntime; |
| import org.eclipse.statet.jcommons.runtime.bundle.BundleEntry; |
| import org.eclipse.statet.jcommons.status.NullProgressMonitor; |
| import org.eclipse.statet.jcommons.status.Status; |
| import org.eclipse.statet.jcommons.status.StatusException; |
| |
| import org.eclipse.statet.internal.rj.servi.MXNetConfig; |
| import org.eclipse.statet.internal.rj.servi.MXNodeConfig; |
| import org.eclipse.statet.internal.rj.servi.MXNodeManager; |
| import org.eclipse.statet.internal.rj.servi.MXPoolConfig; |
| import org.eclipse.statet.internal.rj.servi.MXPoolStatus; |
| import org.eclipse.statet.internal.rj.servi.MXUtils; |
| import org.eclipse.statet.internal.rj.servi.PoolManager; |
| import org.eclipse.statet.rj.RjException; |
| import org.eclipse.statet.rj.RjInitFailedException; |
| import org.eclipse.statet.rj.RjInvalidConfigurationException; |
| import org.eclipse.statet.rj.server.util.RJContext; |
| import org.eclipse.statet.rj.server.util.ServerUtils; |
| import org.eclipse.statet.rj.servi.RServiUtils; |
| import org.eclipse.statet.rj.servi.jmx.PoolServerMXBean; |
| import org.eclipse.statet.rj.servi.jmx.PoolStatusMX; |
| import org.eclipse.statet.rj.servi.node.RServiImpl; |
| import org.eclipse.statet.rj.servi.node.RServiNodeConfig; |
| import org.eclipse.statet.rj.servi.node.RServiNodeFactory; |
| |
| |
| @NonNullByDefault |
| public class JMPoolServer implements PoolServer, PoolServerMXBean { |
| |
| |
| private static final byte DOWN= 0; |
| private static final byte READY= 1; |
| |
| |
| private class LifetimeController extends Thread implements Disposable { |
| |
| private final CopyOnWriteIdentityListSet<PoolManager> stoppedPoolManagers= new CopyOnWriteIdentityListSet<>(); |
| |
| public LifetimeController() { |
| super(String.format("RServiPool(%1$s)-KeepAlive", JMPoolServer.this.id)); |
| setDaemon(false); |
| setPriority(NORM_PRIORITY - 1); |
| CommonsRuntime.getEnvironment().addStoppingListener(this); |
| } |
| |
| public void add(final PoolManager poolManager) { |
| this.stoppedPoolManagers.add(poolManager); |
| } |
| |
| private int checkManagers() { |
| int count= 0; |
| for (final PoolManager manager : this.stoppedPoolManagers) { |
| if (manager.isStopped()) { |
| this.stoppedPoolManagers.remove(manager); |
| } |
| else { |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| @Override |
| public void run() { |
| boolean alive= true; |
| while (alive) { |
| final byte state= JMPoolServer.this.state; |
| final int managers= checkManagers(); |
| alive= !(state == DOWN && managers == 0); |
| try { |
| sleep(200); |
| } |
| catch (final InterruptedException e) { |
| } |
| } |
| } |
| |
| @Override |
| public void dispose() { |
| if (JMPoolServer.this.state != DOWN) { |
| shutdown(); |
| } |
| } |
| |
| } |
| |
| |
| private final String id; |
| private final RJContext context; |
| |
| private volatile byte state; |
| private final LifetimeController lifetimeController; |
| |
| private final String jmBaseName; |
| private @Nullable ObjectName jmxName; |
| |
| private final Set<Integer> rmiEmbeddedPorts= new HashSet<>(); |
| private @Nullable RMIRegistry rmiRegistry; |
| private boolean rmiHostnameSet; |
| |
| private @Nullable String poolAddress; |
| |
| private final MXNetConfig currentNetConfig; |
| private volatile PoolConfig currentPoolConfig; |
| private volatile RServiNodeConfig currentNodeConfig; |
| |
| private final MXNetConfig jmNetConfig; |
| private final MXPoolConfig jmPoolConfig; |
| private final MXNodeConfig jmNodeConfig; |
| |
| private volatile boolean jmIsNodeManagementEnabled; |
| private @Nullable MXNodeManager jmNodeManager; |
| |
| private final RServiNodeFactory nodeFactory; |
| |
| private @Nullable PoolManager poolManager; |
| |
| |
| public JMPoolServer(final String id, final RJContext context) throws RjInitFailedException { |
| this(id, context, true); |
| } |
| |
| public JMPoolServer(final String id, final RJContext context, final boolean enableJM) throws RjInitFailedException { |
| this.id= id; |
| this.context= context; |
| this.jmBaseName= "RServi:rservi.id=" + getId() + ","; |
| |
| this.currentNetConfig= (MXNetConfig)MXUtils.loadInit(new MXNetConfig(this), this.context); |
| this.currentPoolConfig= (PoolConfig)MXUtils.loadInit(new PoolConfig(), this.context); |
| this.currentNodeConfig= (RServiNodeConfig)MXUtils.loadInit(new RServiNodeConfig(), this.context); |
| |
| try { |
| this.nodeFactory= RServiImpl.createLocalNodeFactory(this.id, this.context); |
| } |
| catch (final RjInvalidConfigurationException e) { |
| throw new RjInitFailedException("Creating local R node factory failed.", e); |
| } |
| try { |
| if (enableJM) { |
| this.jmxName= new ObjectName(this.jmBaseName + "type=Server"); |
| ManagementFactory.getPlatformMBeanServer().registerMBean(this, this.jmxName); |
| } |
| |
| this.jmNetConfig= this.currentNetConfig; |
| if (enableJM) { |
| this.jmNetConfig.initJM(); |
| } |
| |
| this.jmPoolConfig= new MXPoolConfig(this); |
| this.jmPoolConfig.load(this.currentPoolConfig); |
| if (enableJM) { |
| this.jmPoolConfig.initJM(); |
| } |
| |
| this.jmNodeConfig= new MXNodeConfig(this); |
| this.jmNodeConfig.load(this.currentNodeConfig); |
| if (enableJM) { |
| this.jmNodeConfig.initJM(); |
| } |
| |
| this.state= READY; |
| this.lifetimeController= new LifetimeController(); |
| this.lifetimeController.start(); |
| } |
| catch (final Exception e) { |
| try { |
| disposeServer(); |
| } |
| catch (final Exception e2) {} |
| throw new RjInitFailedException("Initializing JMX for pool server failed.", e); |
| } |
| try { |
| this.nodeFactory.setConfig(this.currentNodeConfig); |
| } |
| catch (final RjInvalidConfigurationException e) { |
| logWarning("Setting the initial R node config failed.", e); |
| } |
| } |
| |
| private void disposeServer() { |
| try { |
| if (this.jmPoolConfig != null) { |
| this.jmPoolConfig.disposeJM(); |
| } |
| if (this.jmNetConfig != null) { |
| this.jmNetConfig.disposeJM(); |
| } |
| if (this.jmNodeConfig != null) { |
| this.jmNodeConfig.disposeJM(); |
| } |
| |
| if (this.jmxName != null) { |
| ManagementFactory.getPlatformMBeanServer().unregisterMBean(this.jmxName); |
| this.jmxName= null; |
| } |
| } |
| catch (final Exception e) { |
| logError("An error occured when disposing JMX for pool server.", e); |
| } |
| finally { |
| this.state= DOWN; |
| } |
| } |
| |
| |
| @Override |
| public String getId() { |
| return this.id; |
| } |
| |
| @Override |
| public RJContext getRJContext() { |
| return this.context; |
| } |
| |
| @Override |
| public String getJMBaseName() { |
| return this.jmBaseName; |
| } |
| |
| @Override |
| public void getNetConfig(final NetConfig config) { |
| config.load(this.currentNetConfig); |
| } |
| |
| @Override |
| public void setNetConfig(NetConfig config) { |
| config= new NetConfig(config); |
| |
| if (!config.validate(null)) { |
| throw new IllegalArgumentException(); |
| } |
| this.currentNetConfig.load(config); |
| } |
| |
| @Override |
| public void getPoolConfig(final PoolConfig config) { |
| config.load(this.currentPoolConfig); |
| } |
| |
| @Override |
| public void setPoolConfig(PoolConfig config) { |
| config= new PoolConfig(config); // intern |
| |
| if (!config.validate(null)) { |
| throw new IllegalArgumentException(); |
| } |
| synchronized (this.jmPoolConfig) { |
| final PoolManager manager= this.poolManager; |
| if (manager != null) { |
| manager.setConfig(config); |
| } |
| |
| this.currentPoolConfig= config; |
| this.jmPoolConfig.load(config); |
| } |
| } |
| |
| @Override |
| public void getNodeConfig(final RServiNodeConfig config) { |
| config.load(this.currentNodeConfig); |
| } |
| |
| @Override |
| public void setNodeConfig(RServiNodeConfig config) throws RjInvalidConfigurationException { |
| config= new RServiNodeConfig(config); // intern |
| |
| if (!config.validate(null)) { |
| throw new IllegalArgumentException(); |
| } |
| synchronized (this.jmPoolConfig) { |
| this.nodeFactory.setConfig(config); |
| |
| this.currentNodeConfig= config; |
| this.jmNodeConfig.load(config); |
| } |
| } |
| |
| |
| private void initRMI() throws RjException, OperationsException { |
| final String hostAddress; |
| final int registryPort; |
| final boolean embed; |
| final boolean ssl; |
| synchronized (this.currentNetConfig) { |
| if (!MXUtils.validate(this.currentNetConfig)) { |
| return; |
| } |
| hostAddress= nonNullAssert(this.currentNetConfig.getEffectiveHostAddress()); |
| registryPort= this.currentNetConfig.getEffectiveRegistryPort(); |
| embed= this.currentNetConfig.getRegistryEmbed(); |
| ssl= this.currentNetConfig.isSSLEnabled(); |
| } |
| |
| this.rmiRegistry= null; |
| this.nodeFactory.setRegistry(null); |
| this.poolAddress= null; |
| |
| // RMI registry setup |
| if (System.getProperty("java.rmi.server.codebase") == null) { |
| try { |
| final List<BundleEntry> bundles= this.context.resolveBundles( |
| ImCollections.newList(ServerUtils.RJ_SERVER_SPEC, RServiUtils.RJ_SERVI_SPEC) ); |
| System.setProperty("java.rmi.server.codebase", ServerUtils.concatCodebase(bundles)); |
| } |
| catch (final StatusException e) { |
| throw new RjInvalidConfigurationException("Can not resolve bundles for Java codebase of server.", |
| e ); |
| } |
| } |
| |
| if (this.rmiHostnameSet || System.getProperty("java.rmi.server.hostname") == null) { |
| System.setProperty("java.rmi.server.hostname", hostAddress); |
| this.rmiHostnameSet= true; |
| } |
| |
| RMIAddress rmiRegistryAddress; |
| Registry registry; |
| try { |
| rmiRegistryAddress= new RMIAddress(hostAddress, registryPort, null); |
| final RMIClientSocketFactory csf= (ssl) ? new SslRMIClientSocketFactory() : null; |
| registry= LocateRegistry.getRegistry(null, registryPort, csf); |
| } |
| catch (final UnknownHostException e) { |
| throw new RjInvalidConfigurationException("Invalid RMI address.", e); |
| } |
| catch (final MalformedURLException e) { |
| throw new RjInvalidConfigurationException("Invalid RMI address.", e); |
| } |
| catch (final RemoteException e) { |
| throw new RjInitFailedException("Failed to reference local registry.", e); |
| } |
| RMIRegistry rmiRegistry= null; |
| if (embed) { |
| try { |
| rmiRegistry= new RMIRegistry(rmiRegistryAddress, registry, true); |
| if (this.rmiEmbeddedPorts.add(registryPort)) { |
| logWarning(String.format("Found running RMI registry at port %1$s, embedded RMI registry will not be started.", registryPort)); |
| } |
| } |
| catch (final RemoteException e) { |
| RMIRegistryManager.INSTANCE.setEmbeddedPrivateMode(false, ssl); |
| RMIRegistryManager.INSTANCE.setEmbeddedPrivatePort(registryPort); |
| try { |
| rmiRegistry= RMIRegistryManager.INSTANCE.getEmbeddedPrivateRegistry(new NullProgressMonitor()); |
| logInfo(String.format("Embedded RMI registry at port %1$s started.", registryPort)); |
| } |
| catch (final StatusException ee) { |
| logError(String.format("Failed to start embedded RMI registry at port %1$s.", registryPort), |
| ee ); |
| } |
| } |
| } |
| else { |
| try { |
| rmiRegistry= new RMIRegistry(rmiRegistryAddress, registry, true); |
| logInfo(String.format("Found running RMI registry at port %1$s.", registryPort)); |
| } |
| catch (final RemoteException e) { |
| logError(String.format("Failed to connect to RMI registry at port %1$s.", registryPort), |
| e ); |
| throw new RjInitFailedException("Initalization of RMI registry setup failed."); |
| } |
| } |
| |
| this.rmiRegistry= rmiRegistry; |
| this.nodeFactory.setRegistry(rmiRegistry); |
| this.poolAddress= NetConfig.getPoolAddress(hostAddress, registryPort, this.id); |
| } |
| |
| private void startManager() throws RjException { |
| final PoolManager poolManager= new PoolManager(this.id, this.rmiRegistry); |
| synchronized (this.jmPoolConfig) { |
| poolManager.setConfig(this.currentPoolConfig); |
| |
| poolManager.addNodeFactory(this.nodeFactory); |
| |
| poolManager.init(); |
| |
| this.poolManager= poolManager; |
| } |
| |
| MXNodeManager jmNodeManager= null; |
| if (this.jmIsNodeManagementEnabled) { |
| jmNodeManager= new MXNodeManager(this, poolManager); |
| jmNodeManager.activate(); |
| } |
| this.jmNodeManager= jmNodeManager; |
| } |
| |
| |
| private void stopManager() { |
| final var poolManager= this.poolManager; |
| this.poolManager= null; |
| if (poolManager != null && poolManager.isInitialized()) { |
| try { |
| poolManager.stop(0); |
| } |
| catch (final RjException e) { |
| logError("An error occured when stopping the pool manager.", e); |
| } |
| finally { |
| if (this.lifetimeController != null) { |
| this.lifetimeController.add(poolManager); |
| } |
| } |
| } |
| } |
| |
| |
| @Override |
| public @Nullable PoolManager getManager() { |
| return this.poolManager; |
| } |
| |
| @Override |
| public @Nullable String getPoolAddress() { |
| return this.poolAddress; |
| } |
| |
| @Override |
| public PoolStatusMX getPoolStatus() { |
| return new MXPoolStatus(this); |
| } |
| |
| @Override |
| public boolean isPoolNodeManagementEnabled() { |
| return this.jmIsNodeManagementEnabled; |
| } |
| |
| @Override |
| public synchronized void setPoolNodeManagementEnabled(final boolean enable) { |
| if (this.jmIsNodeManagementEnabled == enable) { |
| return; |
| } |
| this.jmIsNodeManagementEnabled= enable; |
| var jmNodeManager= this.jmNodeManager; |
| if (enable) { |
| if (jmNodeManager == null) { |
| final var poolManager= this.poolManager; |
| if (poolManager == null) { |
| return; |
| } |
| jmNodeManager= new MXNodeManager(this, poolManager); |
| this.jmNodeManager= jmNodeManager; |
| } |
| jmNodeManager.activate(); |
| } |
| else { |
| if (jmNodeManager != null) { |
| jmNodeManager.deactivate(); |
| } |
| } |
| } |
| |
| |
| @Override |
| public synchronized void start() throws OperationsException { |
| if (this.state == DOWN) { |
| throw new OperationsException("RServi pool server is shut down."); |
| } |
| try { |
| final var poolManager= this.poolManager; |
| if (poolManager != null) { |
| return; |
| } |
| |
| initRMI(); |
| |
| startManager(); |
| } |
| catch (final RjException e) { |
| logError("Failed to start RServi pool server.", e); |
| throw new OperationsException("Failed to start RServi pool server: " + e.getMessage()); |
| } |
| } |
| |
| @Override |
| public synchronized void stop() throws OperationsException { |
| stopManager(); |
| } |
| |
| @Override |
| public synchronized void restart() throws OperationsException { |
| stop(); |
| start(); |
| } |
| |
| public synchronized void shutdown() { |
| if (this.state == DOWN) { |
| return; |
| } |
| |
| stopManager(); |
| disposeServer(); |
| } |
| |
| public void waitForDisposal(final long timeoutMillis) throws InterruptedException { |
| if (this.lifetimeController != null) { |
| this.lifetimeController.join(timeoutMillis); |
| } |
| } |
| |
| |
| private void log(final byte severity, final String mainMessage, |
| final @Nullable Throwable e) { |
| final var message= new ToStringBuilder(mainMessage); |
| message.addProp("poolId", this.id); |
| CommonsRuntime.log( |
| Status.newStatus(severity, RServiUtils.RJ_SERVI_ID, message.toString(), e) ); |
| } |
| |
| private void logInfo(final String mainMessage) { |
| log(Status.INFO, mainMessage, null); |
| } |
| |
| private void logWarning(final String mainMessage, |
| final Throwable e) { |
| log(Status.WARNING, mainMessage, e); |
| } |
| |
| private void logWarning(final String mainMessage) { |
| log(Status.WARNING, mainMessage, null); |
| } |
| |
| private void logError(final String mainMessage, |
| final Throwable e) { |
| log(Status.ERROR, mainMessage, e); |
| } |
| |
| } |