| /*=============================================================================# |
| # Copyright (c) 2009, 2020 Stephan Wahlbrink and others. |
| # |
| # This program and the accompanying materials are made available under the |
| # terms of the Eclipse Public License 2.0 which is available at |
| # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 |
| # which is available at https://www.apache.org/licenses/LICENSE-2.0. |
| # |
| # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 |
| # |
| # Contributors: |
| # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation |
| #=============================================================================*/ |
| |
| package org.eclipse.statet.internal.rj.servi; |
| |
| import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert; |
| |
| import java.rmi.Remote; |
| import java.rmi.server.RMIClientSocketFactory; |
| import java.rmi.server.RMIServerSocketFactory; |
| import java.rmi.server.UnicastRemoteObject; |
| import java.util.Collection; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.rmi.ssl.SslRMIClientSocketFactory; |
| import javax.rmi.ssl.SslRMIServerSocketFactory; |
| |
| import org.eclipse.statet.jcommons.collections.CopyOnWriteIdentityListSet; |
| import org.eclipse.statet.jcommons.collections.ImCollection; |
| import org.eclipse.statet.jcommons.collections.ImCollections; |
| import org.eclipse.statet.jcommons.collections.ImList; |
| import org.eclipse.statet.jcommons.lang.NonNullByDefault; |
| import org.eclipse.statet.jcommons.lang.Nullable; |
| import org.eclipse.statet.jcommons.rmi.RMIRegistry; |
| |
| import org.eclipse.statet.rj.RjException; |
| import org.eclipse.statet.rj.RjInitFailedException; |
| import org.eclipse.statet.rj.server.ServerLogin; |
| import org.eclipse.statet.rj.servi.RServi; |
| import org.eclipse.statet.rj.servi.node.RServiNodeFactory; |
| import org.eclipse.statet.rj.servi.node.RServiPool; |
| import org.eclipse.statet.rj.servi.pool.PoolConfig; |
| import org.eclipse.statet.rj.servi.pool.PoolNodeObject; |
| import org.eclipse.statet.rj.servi.pool.RServiPoolManager; |
| |
| |
| @NonNullByDefault |
| public class PoolManager implements RServiPool, RServiPoolManager { |
| |
| |
| private static final int STOP_ON_ERROR= 1 << 24; |
| |
| |
| private final String id; |
| |
| private final @Nullable RMIRegistry registry; |
| |
| private @Nullable Remote thisRemote; |
| |
| private ImList<NodeFactory> nodeFactories= ImCollections.emptyList(); |
| |
| private final Stats stats; |
| private final CopyOnWriteIdentityListSet<PoolListener> poolListeners= new CopyOnWriteIdentityListSet<>(); |
| |
| private @Nullable APool2 pool; |
| private @Nullable APool2NodeFactory poolFactory; |
| private PoolConfig poolConfig; |
| |
| private final Object resourceLock= new Object(); |
| private final ScheduledThreadPoolExecutor executor; |
| private volatile int unclosed; |
| |
| |
| public PoolManager(final String id, final @Nullable RMIRegistry registry) { |
| this.id= nonNullAssert(id); |
| this.registry= registry; |
| |
| this.stats= new Stats(); |
| this.poolListeners.add(this.stats); |
| |
| this.poolConfig= new PoolConfig(); |
| |
| this.executor= new ScheduledThreadPoolExecutor(0); |
| configResources(); |
| |
| Utils.preLoad(); |
| } |
| |
| |
| @Override |
| public String getId() { |
| return this.id; |
| } |
| |
| @Override |
| public ImCollection<? extends NodeFactory> getFactories() { |
| return this.nodeFactories; |
| } |
| |
| @Override |
| public synchronized void addNodeFactory(final RServiNodeFactory factory) { |
| if (!this.nodeFactories.isEmpty()) { |
| throw new UnsupportedOperationException("Multiple factories are not supported"); |
| } |
| nonNullAssert(factory); |
| this.nodeFactories= ImCollections.newList((NodeFactory)factory); |
| } |
| |
| @Override |
| public synchronized void setConfig(final PoolConfig config) { |
| final var pool= this.pool; |
| if (pool != null) { |
| final var poolFactory= nonNullAssert(this.poolFactory); |
| pool.setConfig(config); |
| poolFactory.setMaxUsageCount(config.getMaxUsageCount()); |
| } |
| this.poolConfig= config; |
| } |
| |
| @Override |
| public PoolConfig getConfig() { |
| return this.poolConfig; |
| } |
| |
| public void addPoolListener(final PoolListener listener) { |
| this.poolListeners.add(listener); |
| } |
| |
| public void removePoolListener(final PoolListener listener) { |
| this.poolListeners.remove(listener); |
| } |
| |
| @Override |
| public synchronized void init() throws RjException { |
| final var poolFactory= new APool2NodeFactory( |
| this.nodeFactories.get(0), this.poolListeners, this.executor ); |
| poolFactory.setMaxUsageCount(this.poolConfig.getMaxUsageCount()); |
| this.poolFactory= poolFactory; |
| final var pool= new APool2(poolFactory, this.poolConfig) { |
| @Override |
| protected void closeFinally() { |
| super.closeFinally(); |
| onPoolClosed(this); |
| } |
| }; |
| this.pool= pool; |
| onPoolCreated(pool); |
| |
| Utils.logInfo("Publishing pool in registry..."); |
| final var registry= this.registry; |
| if (registry != null) { |
| RMIClientSocketFactory clientSocketFactory= null; |
| RMIServerSocketFactory serverSocketFactory= null; |
| if (registry.getAddress().isSsl()) { |
| clientSocketFactory= new SslRMIClientSocketFactory(); |
| serverSocketFactory= new SslRMIServerSocketFactory(null, null, true); |
| } |
| try { |
| final Remote remote= UnicastRemoteObject.exportObject(this, 0, |
| clientSocketFactory, serverSocketFactory ); |
| this.thisRemote= remote; |
| registry.getRegistry().rebind(PoolConfig.getPoolName(this.id), remote); |
| } |
| catch (final Exception e) { |
| try { |
| stop(STOP_ON_ERROR); |
| } |
| catch (final Exception ignore) {} |
| Utils.logError("An error occurred when binding the pool in the registry.", e); |
| throw new RjInitFailedException("An error occurred when publishing the pool in the registry."); |
| } |
| } |
| } |
| |
| public boolean isInitialized() { |
| return (this.pool != null); |
| } |
| |
| public void checkInitialized() { |
| if (!isInitialized()) { |
| throw new IllegalStateException("not initialized"); |
| } |
| } |
| |
| @Override |
| public Collection<? extends PoolNodeObject> getPoolNodeObjects() { |
| checkInitialized(); |
| final var poolFactory= nonNullAssert(this.poolFactory); |
| |
| return poolFactory.getAllObjects(); |
| } |
| |
| public boolean isStopped() { |
| return (this.unclosed == 0 |
| && this.executor.getQueue().isEmpty() && this.executor.getPoolSize() == 0); |
| } |
| |
| @Override |
| public synchronized void stop(final int mode) throws RjException { |
| Utils.logInfo("Unpublishing pool..."); |
| final var registry= this.registry; |
| if (registry != null) { |
| try { |
| registry.getRegistry().unbind(PoolConfig.getPoolName(this.id)); |
| } |
| catch (final Exception e) { |
| if ((mode & STOP_ON_ERROR) != 0) { |
| Utils.logError("An error occurred when unbinding the pool from the registry.", e); |
| } |
| } |
| } |
| if (this.thisRemote != null) { |
| try { |
| this.thisRemote= null; |
| UnicastRemoteObject.unexportObject(this, true); |
| } |
| catch (final Exception e) { |
| if ((mode & STOP_ON_ERROR) != 0) { |
| Utils.logError("An error occurred when unexport the pool.", e); |
| } |
| } |
| } |
| |
| try { |
| Thread.sleep(500); |
| } |
| catch (final InterruptedException e) {} |
| final var pool= this.pool; |
| if (pool != null) { |
| Utils.logInfo("Closing R nodes..."); |
| try { |
| pool.close(this.poolConfig.getEvictionTimeout()); |
| } |
| catch (final Exception e) { |
| Utils.logError("An error occurred when closing the pool.", e); |
| } |
| finally { |
| Runtime.getRuntime().gc(); |
| } |
| } |
| } |
| |
| |
| private void configResources() { |
| final int unclosed= this.unclosed; |
| if (unclosed > 0) { |
| this.executor.setKeepAliveTime(60, TimeUnit.SECONDS); |
| this.executor.setCorePoolSize(1); |
| this.executor.setMaximumPoolSize(2); |
| } |
| else { |
| this.executor.setKeepAliveTime(0, TimeUnit.SECONDS); |
| this.executor.setCorePoolSize(0); |
| this.executor.setMaximumPoolSize(2); |
| } |
| } |
| |
| private void onPoolCreated(final APool2 pool) { |
| synchronized (this.resourceLock) { |
| if (this.unclosed++ == 0) { |
| configResources(); |
| } |
| } |
| } |
| |
| private void onPoolClosed(final APool2 pool) { |
| synchronized (this.resourceLock) { |
| if (--this.unclosed == 0) { |
| configResources(); |
| } |
| } |
| } |
| |
| |
| @Override |
| public RServi getRServi(final String name, final @Nullable ServerLogin login) |
| throws NoSuchElementException, RjException { |
| final APool2NodeHandler handler= getPoolObject(name); |
| return new RServiImpl<>(handler.getAccessId(), handler, handler.getClientHandler()); |
| } |
| |
| private APool2NodeHandler getPoolObject(final String client) |
| throws NoSuchElementException, RjException { |
| checkInitialized(); |
| final var pool= nonNullAssert(this.pool); |
| try { |
| |
| return pool.borrowObject(client); |
| } |
| catch (final NoSuchElementException e) { |
| this.stats.logServRequestFailed(3); |
| throw new NoSuchElementException(Messages.GetRServi_NoInstance_pub_Pool_message); |
| } |
| catch (final Exception e) { |
| this.stats.logServRequestFailed(4); |
| Utils.logError(Messages.BindClient_error_message, e); |
| throw new RjException(Messages.GetRServi_pub_error_message); |
| } |
| } |
| |
| @Override |
| public RServiPoolManager.Counter getCounter() { |
| checkInitialized(); |
| final var pool= nonNullAssert(this.pool); |
| final var poolFactory= nonNullAssert(this.poolFactory); |
| |
| final RServiPoolManager.Counter counter= new RServiPoolManager.Counter(); |
| synchronized (pool) { |
| counter.numIdling= pool.getNumIdle(); |
| counter.numInUse= pool.getNumActive(); |
| counter.numTotal= counter.numIdling + counter.numTotal; |
| counter.maxIdling= -1; |
| counter.maxInUse= poolFactory.getStatMaxAllocated(); |
| counter.maxTotal= poolFactory.getStatMaxTotal(); |
| } |
| return counter; |
| } |
| |
| } |