blob: 7b7dd84cc76be09b0863525f9ffa6877deedb94e [file] [log] [blame]
/*=============================================================================#
# Copyright (c) 2009, 2020 Stephan Wahlbrink and others.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
package org.eclipse.statet.internal.rj.servi;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
import java.rmi.Remote;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMIServerSocketFactory;
import java.rmi.server.UnicastRemoteObject;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import javax.rmi.ssl.SslRMIServerSocketFactory;
import org.eclipse.statet.jcommons.collections.CopyOnWriteIdentityListSet;
import org.eclipse.statet.jcommons.collections.ImCollection;
import org.eclipse.statet.jcommons.collections.ImCollections;
import org.eclipse.statet.jcommons.collections.ImList;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
import org.eclipse.statet.jcommons.rmi.RMIRegistry;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.RjInitFailedException;
import org.eclipse.statet.rj.server.ServerLogin;
import org.eclipse.statet.rj.servi.RServi;
import org.eclipse.statet.rj.servi.node.RServiNodeFactory;
import org.eclipse.statet.rj.servi.node.RServiPool;
import org.eclipse.statet.rj.servi.pool.PoolConfig;
import org.eclipse.statet.rj.servi.pool.PoolNodeObject;
import org.eclipse.statet.rj.servi.pool.RServiPoolManager;
@NonNullByDefault
public class PoolManager implements RServiPool, RServiPoolManager {
private static final int STOP_ON_ERROR= 1 << 24;
private final String id;
private final @Nullable RMIRegistry registry;
private @Nullable Remote thisRemote;
private ImList<NodeFactory> nodeFactories= ImCollections.emptyList();
private final Stats stats;
private final CopyOnWriteIdentityListSet<PoolListener> poolListeners= new CopyOnWriteIdentityListSet<>();
private @Nullable APool2 pool;
private @Nullable APool2NodeFactory poolFactory;
private PoolConfig poolConfig;
private final Object resourceLock= new Object();
private final ScheduledThreadPoolExecutor executor;
private volatile int unclosed;
public PoolManager(final String id, final @Nullable RMIRegistry registry) {
this.id= nonNullAssert(id);
this.registry= registry;
this.stats= new Stats();
this.poolListeners.add(this.stats);
this.poolConfig= new PoolConfig();
this.executor= new ScheduledThreadPoolExecutor(0);
configResources();
Utils.preLoad();
}
@Override
public String getId() {
return this.id;
}
@Override
public ImCollection<? extends NodeFactory> getFactories() {
return this.nodeFactories;
}
@Override
public synchronized void addNodeFactory(final RServiNodeFactory factory) {
if (!this.nodeFactories.isEmpty()) {
throw new UnsupportedOperationException("Multiple factories are not supported");
}
nonNullAssert(factory);
this.nodeFactories= ImCollections.newList((NodeFactory)factory);
}
@Override
public synchronized void setConfig(final PoolConfig config) {
final var pool= this.pool;
if (pool != null) {
final var poolFactory= nonNullAssert(this.poolFactory);
pool.setConfig(config);
poolFactory.setMaxUsageCount(config.getMaxUsageCount());
}
this.poolConfig= config;
}
@Override
public PoolConfig getConfig() {
return this.poolConfig;
}
public void addPoolListener(final PoolListener listener) {
this.poolListeners.add(listener);
}
public void removePoolListener(final PoolListener listener) {
this.poolListeners.remove(listener);
}
@Override
public synchronized void init() throws RjException {
final var poolFactory= new APool2NodeFactory(
this.nodeFactories.get(0), this.poolListeners, this.executor );
poolFactory.setMaxUsageCount(this.poolConfig.getMaxUsageCount());
this.poolFactory= poolFactory;
final var pool= new APool2(poolFactory, this.poolConfig) {
@Override
protected void closeFinally() {
super.closeFinally();
onPoolClosed(this);
}
};
this.pool= pool;
onPoolCreated(pool);
Utils.logInfo("Publishing pool in registry...");
final var registry= this.registry;
if (registry != null) {
RMIClientSocketFactory clientSocketFactory= null;
RMIServerSocketFactory serverSocketFactory= null;
if (registry.getAddress().isSsl()) {
clientSocketFactory= new SslRMIClientSocketFactory();
serverSocketFactory= new SslRMIServerSocketFactory(null, null, true);
}
try {
final Remote remote= UnicastRemoteObject.exportObject(this, 0,
clientSocketFactory, serverSocketFactory );
this.thisRemote= remote;
registry.getRegistry().rebind(PoolConfig.getPoolName(this.id), remote);
}
catch (final Exception e) {
try {
stop(STOP_ON_ERROR);
}
catch (final Exception ignore) {}
Utils.logError("An error occurred when binding the pool in the registry.", e);
throw new RjInitFailedException("An error occurred when publishing the pool in the registry.");
}
}
}
public boolean isInitialized() {
return (this.pool != null);
}
public void checkInitialized() {
if (!isInitialized()) {
throw new IllegalStateException("not initialized");
}
}
@Override
public Collection<? extends PoolNodeObject> getPoolNodeObjects() {
checkInitialized();
final var poolFactory= nonNullAssert(this.poolFactory);
return poolFactory.getAllObjects();
}
public boolean isStopped() {
return (this.unclosed == 0
&& this.executor.getQueue().isEmpty() && this.executor.getPoolSize() == 0);
}
@Override
public synchronized void stop(final int mode) throws RjException {
Utils.logInfo("Unpublishing pool...");
final var registry= this.registry;
if (registry != null) {
try {
registry.getRegistry().unbind(PoolConfig.getPoolName(this.id));
}
catch (final Exception e) {
if ((mode & STOP_ON_ERROR) != 0) {
Utils.logError("An error occurred when unbinding the pool from the registry.", e);
}
}
}
if (this.thisRemote != null) {
try {
this.thisRemote= null;
UnicastRemoteObject.unexportObject(this, true);
}
catch (final Exception e) {
if ((mode & STOP_ON_ERROR) != 0) {
Utils.logError("An error occurred when unexport the pool.", e);
}
}
}
try {
Thread.sleep(500);
}
catch (final InterruptedException e) {}
final var pool= this.pool;
if (pool != null) {
Utils.logInfo("Closing R nodes...");
try {
pool.close(this.poolConfig.getEvictionTimeout());
}
catch (final Exception e) {
Utils.logError("An error occurred when closing the pool.", e);
}
finally {
Runtime.getRuntime().gc();
}
}
}
private void configResources() {
final int unclosed= this.unclosed;
if (unclosed > 0) {
this.executor.setKeepAliveTime(60, TimeUnit.SECONDS);
this.executor.setCorePoolSize(1);
this.executor.setMaximumPoolSize(2);
}
else {
this.executor.setKeepAliveTime(0, TimeUnit.SECONDS);
this.executor.setCorePoolSize(0);
this.executor.setMaximumPoolSize(2);
}
}
private void onPoolCreated(final APool2 pool) {
synchronized (this.resourceLock) {
if (this.unclosed++ == 0) {
configResources();
}
}
}
private void onPoolClosed(final APool2 pool) {
synchronized (this.resourceLock) {
if (--this.unclosed == 0) {
configResources();
}
}
}
@Override
public RServi getRServi(final String name, final @Nullable ServerLogin login)
throws NoSuchElementException, RjException {
final APool2NodeHandler handler= getPoolObject(name);
return new RServiImpl<>(handler.getAccessId(), handler, handler.getClientHandler());
}
private APool2NodeHandler getPoolObject(final String client)
throws NoSuchElementException, RjException {
checkInitialized();
final var pool= nonNullAssert(this.pool);
try {
return pool.borrowObject(client);
}
catch (final NoSuchElementException e) {
this.stats.logServRequestFailed(3);
throw new NoSuchElementException(Messages.GetRServi_NoInstance_pub_Pool_message);
}
catch (final Exception e) {
this.stats.logServRequestFailed(4);
Utils.logError(Messages.BindClient_error_message, e);
throw new RjException(Messages.GetRServi_pub_error_message);
}
}
@Override
public RServiPoolManager.Counter getCounter() {
checkInitialized();
final var pool= nonNullAssert(this.pool);
final var poolFactory= nonNullAssert(this.poolFactory);
final RServiPoolManager.Counter counter= new RServiPoolManager.Counter();
synchronized (pool) {
counter.numIdling= pool.getNumIdle();
counter.numInUse= pool.getNumActive();
counter.numTotal= counter.numIdling + counter.numTotal;
counter.maxIdling= -1;
counter.maxInUse= poolFactory.getStatMaxAllocated();
counter.maxTotal= poolFactory.getStatMaxTotal();
}
return counter;
}
}