Bug 567221: [RJ-Servi] (Re)add detection of lost clients
Change-Id: I394aa96c1d965ab397f2b1a079e6774ef2d1199d
diff --git a/core/org.eclipse.statet.rj.client/src/org/eclipse/statet/rj/server/client/AbstractRJComClient.java b/core/org.eclipse.statet.rj.client/src/org/eclipse/statet/rj/server/client/AbstractRJComClient.java
index 311d6c6..6ab84f3 100644
--- a/core/org.eclipse.statet.rj.client/src/org/eclipse/statet/rj/server/client/AbstractRJComClient.java
+++ b/core/org.eclipse.statet.rj.client/src/org/eclipse/statet/rj/server/client/AbstractRJComClient.java
@@ -242,7 +242,7 @@
private List<Runnable> defferedRunnables;
private boolean closed;
- private ScheduledFuture<?> keepAliveJob;
+ private ScheduledFuture<?> periodicCheckJob;
private String closedMessage= "Connection to R engine is closed.";
private final Lock clientWaitLock= new ReentrantLock();
@@ -287,16 +287,15 @@
synchronized (this) {
this.rjConsoleEngine= rjServer;
if (client == 0) {
- long t= 45 * 1000;
+ int t= 45 * 1000;
try {
final String p= System.getProperty("org.eclipse.statet.rj.client.keepaliveInterval"); //$NON-NLS-1$
if (p != null && p.length() > 0) {
- t= Long.parseLong(p);
+ t= Integer.parseInt(p);
}
}
catch (final Exception e) {}
- this.keepAliveJob= RJHelper_EXECUTOR.scheduleWithFixedDelay(
- new KeepAliveRunnable(), t, t, TimeUnit.MILLISECONDS );
+ initPeriodicCheck(new KeepAliveRunnable(), t);
}
runnables= this.defferedRunnables;
this.defferedRunnables= null;
@@ -308,6 +307,11 @@
}
}
+ public void initPeriodicCheck(final Runnable runnable, final int intervalMillis) {
+ this.periodicCheckJob= RJHelper_EXECUTOR.scheduleWithFixedDelay(
+ runnable, 1000, intervalMillis, TimeUnit.MILLISECONDS );
+ }
+
public Object getRHandle() {
return this.rHandle;
}
@@ -354,9 +358,9 @@
if (this.closed != closed) {
this.closed= closed;
if (closed) {
- final ScheduledFuture<?> job= this.keepAliveJob;
+ final ScheduledFuture<?> job= this.periodicCheckJob;
if (job != null) {
- this.keepAliveJob= null;
+ this.periodicCheckJob= null;
job.cancel(true);
}
}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
new file mode 100644
index 0000000..202af6f
--- /dev/null
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
@@ -0,0 +1,70 @@
+/*=============================================================================#
+ # Copyright (c) 2021 Stephan Wahlbrink and others.
+ #
+ # This program and the accompanying materials are made available under the
+ # terms of the Eclipse Public License 2.0 which is available at
+ # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ # which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ #
+ # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ #
+ # Contributors:
+ # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
+ #=============================================================================*/
+
+package org.eclipse.statet.rj.servi;
+
+import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
+
+import java.lang.reflect.Field;
+
+import org.eclipse.statet.jcommons.lang.NonNullByDefault;
+import org.eclipse.statet.jcommons.lang.Nullable;
+
+
+@NonNullByDefault
+public class Accessor {
+
+
+ private Class<?> clazz;
+ private final Object instance;
+
+
+ public Accessor(final Object instance, final Class<?> clazz) {
+ this.instance= nonNullAssert(instance);
+ this.clazz= nonNullAssert(clazz);
+ }
+
+ public Accessor(final Object instance, final String className, final ClassLoader classLoader) {
+ this.instance= nonNullAssert(instance);
+ try {
+ this.clazz= Class.forName(className, true, classLoader);
+ }
+ catch (ClassNotFoundException | ExceptionInInitializerError e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public Field getField(final String fieldName) {
+ try {
+ final Field field= this.clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field;
+ }
+ catch (SecurityException | NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public @Nullable Object getFieldObj(final String fieldName) {
+ try {
+ final Field field= getField(fieldName);
+ return field.get(this.instance);
+ }
+ catch (IllegalArgumentException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
index 606263e..b4efd31 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
@@ -22,15 +22,20 @@
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullLateInit;
+import static org.eclipse.statet.internal.rj.servi.APool2.CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY;
+
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import javax.security.auth.login.LoginException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@@ -42,8 +47,11 @@
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
+import org.eclipse.statet.internal.rj.servi.RServiImpl;
import org.eclipse.statet.rj.data.RDataUtils;
+import org.eclipse.statet.rj.server.client.AbstractRJComClient;
import org.eclipse.statet.rj.server.util.RJContext;
+import org.eclipse.statet.rj.servi.Accessor;
import org.eclipse.statet.rj.servi.RServi;
import org.eclipse.statet.rj.servi.RServiUtils;
import org.eclipse.statet.rj.servi.jmx.PoolStatusMX;
@@ -293,6 +301,70 @@
@Test
+ public void ClientCheck_alive() throws Exception {
+ final ProgressMonitor m= new NullProgressMonitor();
+ final int intervalSec= 10;
+ System.setProperty(CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY,
+ Long.toString(TimeUnit.SECONDS.toMillis(intervalSec)) );
+ try {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMaxTotalCount(1);
+ poolConfig.setMaxUsageCount(1);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+ assertNodeOperative(servi1);
+
+ final PoolNodeObject node1= assertSinglePoolNode();
+
+ awaitStateCondition(node1, (state) -> (state != PoolNodeState.ALLOCATED),
+ (int)(2.5 * intervalSec) + WAIT_IDLE_SEC );
+ skipChecking(node1);
+ assertEquals(PoolNodeState.ALLOCATED, node1.getState());
+ assertNodeOperative(servi1);
+ }
+ finally {
+ System.clearProperty(CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY);
+ }
+ }
+
+ @Test
+ public void ClientCheck_dead() throws Exception {
+ final ProgressMonitor m= new NullProgressMonitor();
+ final int intervalSec= 10;
+ System.setProperty(CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY,
+ Long.toString(TimeUnit.SECONDS.toMillis(intervalSec)) );
+ try {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMaxTotalCount(1);
+ poolConfig.setMaxUsageCount(1);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+ assertNodeOperative(servi1);
+
+ final PoolNodeObject node1= assertSinglePoolNode();
+
+ final Accessor servi1Impl= new Accessor(servi1, RServiImpl.class);
+ final Accessor rjsClient= new Accessor(servi1Impl.getFieldObj("rjs"), AbstractRJComClient.class);
+ final ScheduledFuture<?> clientCheckJob= (ScheduledFuture<?>)rjsClient.getFieldObj("periodicCheckJob");
+ clientCheckJob.cancel(false);
+
+ awaitStateCondition(node1, (state) -> (state != PoolNodeState.ALLOCATED),
+ (int)(2.5 * intervalSec) + WAIT_IDLE_SEC );
+ skipChecking(node1);
+ assertEquals(PoolNodeState.DISPOSED, node1.getState());
+ Assertions.assertThrows(StatusException.class, () -> assertNodeOperative(servi1));
+ }
+ finally {
+ System.clearProperty(CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY);
+ }
+ }
+
+
+ @Test
public void PoolNode_getState() throws Exception {
final ProgressMonitor m= new NullProgressMonitor();
@@ -379,12 +451,17 @@
}
private void skipChecking(final PoolNodeObject node) throws InterruptedException {
+ awaitStateCondition(node, (state) -> (state != PoolNodeState.CHECKING), WAIT_IDLE_SEC);
+ }
+
+ private void awaitStateCondition(final PoolNodeObject node,
+ final Predicate<PoolNodeState> condition, final int timeoutSec) throws InterruptedException {
final long t= System.nanoTime();
while (true) {
- if (node.getState() != PoolNodeState.CHECKING) {
+ if (condition.test(node.getState())) {
return;
}
- if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - t) > WAIT_IDLE_SEC) {
+ if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - t) > timeoutSec) {
return;
}
Thread.sleep(100);
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
index 71b76d9..af6c51c 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
@@ -26,6 +26,7 @@
import org.eclipse.statet.jcommons.runtime.CommonsRuntime;
import org.eclipse.statet.jcommons.status.ErrorStatus;
+import org.eclipse.statet.rj.RjClosedException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.RjInitFailedException;
import org.eclipse.statet.rj.servi.RServi;
@@ -41,6 +42,16 @@
private class ThisNodeHandler extends NodeHandler implements RServiImpl.PoolRef {
@Override
+ public int getCheckIntervalMillis() throws RemoteException {
+ return 0;
+ }
+
+ @Override
+ public void check(final long accessId) throws RjException, RemoteException {
+ checkRServi(accessId);
+ }
+
+ @Override
public void returnObject(final long accessId) throws RjException, RemoteException {
returnRServi(accessId);
}
@@ -139,12 +150,18 @@
}
}
- private synchronized void returnRServi(final long accessId) {
+ private synchronized void checkRServi(final long accessId) throws RjClosedException {
+ if (this.accessId != accessId) {
+ throw new RjClosedException("RServi instance is no longer valid.");
+ }
+ }
+
+ private synchronized void returnRServi(final long accessId) throws RjClosedException {
if (this.handler == null) {
return;
}
if (this.accessId != accessId) {
- throw new IllegalStateException("Access id no longer valid.");
+ throw new RjClosedException("RServi instance is no longer valid.");
}
try {
final ThisNodeHandler handler= nonNullAssert(this.handler);
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/RServiImpl.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/RServiImpl.java
index 3faefc5..d54b83d 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/RServiImpl.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/RServiImpl.java
@@ -14,6 +14,8 @@
package org.eclipse.statet.internal.rj.servi;
+import static org.eclipse.statet.rj.servi.RServiUtils.RJ_SERVI_ID;
+
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
@@ -33,8 +35,10 @@
import org.eclipse.statet.jcommons.status.ProgressMonitor;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
+import org.eclipse.statet.jcommons.status.WarningStatus;
import org.eclipse.statet.internal.rj.servi.server.RServiBackend;
+import org.eclipse.statet.rj.RjClosedException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.data.REnvironment;
import org.eclipse.statet.rj.data.RObject;
@@ -49,7 +53,6 @@
import org.eclipse.statet.rj.server.client.RClientGraphicFactory;
import org.eclipse.statet.rj.server.client.RGraphicCreatorImpl;
import org.eclipse.statet.rj.servi.RServi;
-import org.eclipse.statet.rj.servi.RServiUtils;
import org.eclipse.statet.rj.services.BasicFQRObject;
import org.eclipse.statet.rj.services.FQRObject;
import org.eclipse.statet.rj.services.FunctionCall;
@@ -95,17 +98,17 @@
case Server.S_STOPPED:
break;
case RjsStatus.ERROR:
- throw new StatusException(new ErrorStatus(RServiUtils.RJ_SERVI_ID, "Server or IO error."));
+ throw new StatusException(new ErrorStatus(RJ_SERVI_ID, "Server or IO error."));
default:
throw new IllegalStateException();
}
if (!isClosed()) {
setClosed(true);
- handleStatus(new InfoStatus(RServiUtils.RJ_SERVI_ID, "RServi is disconnected."),
+ handleStatus(new InfoStatus(RJ_SERVI_ID, "RServi is disconnected."),
m );
}
- throw new StatusException(new ErrorStatus(RServiUtils.RJ_SERVI_ID, "RServi is closed."));
+ throw new StatusException(new ErrorStatus(RJ_SERVI_ID, "RServi is closed."));
}
@Override
@@ -124,7 +127,14 @@
public static interface PoolRef extends Remote {
- void returnObject(long accessId) throws RjException, RemoteException;
+
+
+ int getCheckIntervalMillis() throws RemoteException;
+
+ void check(final long accessId) throws RjException, RemoteException;
+
+ void returnObject(final long accessId) throws RjException, RemoteException;
+
}
@@ -139,10 +149,7 @@
public RServiImpl(final long accessId, final PoolRef ref, final RServiBackend backend) {
- this.accessId= accessId;
- this.poolRef= ref;
- this.backend= backend;
- this.rjs.setServer(this.backend, 1);
+ init(accessId, ref, backend);
}
@SuppressWarnings("null") // init in ::readExternal
@@ -151,9 +158,13 @@
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- this.accessId= in.readLong();
- this.poolRef= (PoolRef)in.readObject();
- this.backend= (RServiBackend)in.readObject();
+ init(in.readLong(), (PoolRef)in.readObject(), (RServiBackend)in.readObject());
+ }
+
+ private void init(final long accessId, final PoolRef poolRef, final RServiBackend backend) {
+ this.accessId= accessId;
+ this.poolRef= poolRef;
+ this.backend= backend;
this.rjs.setServer(this.backend, 1);
}
@@ -177,11 +188,24 @@
}
- private void init() throws StatusException {
+ private synchronized void init() throws StatusException {
this.rjsId= RjsComConfig.registerClientComHandler(this.rjs);
- final Map<String, Object> properties= new HashMap<>();
- this.rjs.initClient(this.rHandle, this, properties, this.rjsId);
- this.rjs.setRjsProperties(properties);
+ try {
+ final Map<String, Object> properties= new HashMap<>();
+ this.rjs.initClient(this.rHandle, this, properties, this.rjsId);
+ this.rjs.setRjsProperties(properties);
+
+ final int checkIntervalMillis= this.poolRef.getCheckIntervalMillis();
+ if (checkIntervalMillis > 0) {
+ this.rjs.initPeriodicCheck(this::checkPool, checkIntervalMillis);
+ }
+ }
+ catch (final Exception e) {
+ forceClose();
+ throw new StatusException(new ErrorStatus(RJ_SERVI_ID,
+ "An error occurred when initializing RServi instance.",
+ e ));
+ }
}
protected void dispose() {
@@ -192,6 +216,35 @@
this.rjs.disposeAllGraphics();
}
+ private void checkPool() {
+ final PoolRef poolRef= this.poolRef;
+ if (poolRef == null) {
+ return;
+ }
+ try {
+ poolRef.check(this.accessId);
+ }
+ catch (final RjClosedException e) {
+ synchronized (this) {
+ if (!isClosed()) {
+ CommonsRuntime.log(new WarningStatus(RJ_SERVI_ID,
+ "Closing of RServi instance enforced by RServi pool.",
+ e ));
+ forceClose();
+ }
+ }
+ }
+ catch (final Exception e) {
+ synchronized (this) {
+ if (!isClosed()) {
+ CommonsRuntime.log(new ErrorStatus(RJ_SERVI_ID,
+ "An error occurred when running RServi pool check.",
+ e ));
+ }
+ }
+ }
+ }
+
@Override
public boolean isClosed() {
return this.rjs.isClosed();
@@ -200,7 +253,7 @@
@Override
public synchronized void close() throws StatusException {
if (this.rjs.isClosed()) {
- throw new StatusException(new ErrorStatus(RServiUtils.RJ_SERVI_ID,
+ throw new StatusException(new ErrorStatus(RJ_SERVI_ID,
"RServi is already closed." ));
}
try {
@@ -208,7 +261,24 @@
this.poolRef.returnObject(this.accessId);
}
catch (final Exception e) {
- throw new StatusException(new ErrorStatus(RServiUtils.RJ_SERVI_ID,
+ throw new StatusException(new ErrorStatus(RJ_SERVI_ID,
+ "An error occurred when closing RServi instance.",
+ e ));
+ }
+ finally {
+ dispose();
+ }
+ }
+
+ private void forceClose() {
+ try {
+ this.rjs.setClosed(true);
+ this.poolRef.returnObject(this.accessId);
+ }
+ catch (final RjClosedException e) {
+ }
+ catch (final Exception e) {
+ CommonsRuntime.log(new ErrorStatus(RJ_SERVI_ID,
"An error occurred when closing RServi instance.",
e ));
}
diff --git a/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2.java b/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2.java
index 3b33f4d..c148fdc 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2.java
@@ -15,6 +15,7 @@
package org.eclipse.statet.internal.rj.servi;
import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.PooledObjectState;
import org.apache.commons.pool2.impl.DefaultEvictionPolicy;
import org.apache.commons.pool2.impl.EvictionConfig;
import org.apache.commons.pool2.impl.EvictionPolicy;
@@ -31,10 +32,14 @@
public class APool2 extends GenericObjectPool<APool2NodeHandler> {
+ public static final String CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY= "org.eclipse.statet.rj.servi.pool.ClientCheck.Interval.millis"; //$NON-NLS-1$
+
+
private static final byte CLOSING= 2;
private static final byte CLOSING_FINALLY= 3;
private static final byte CLOSED= 4;
+
private static final EvictionPolicy<APool2NodeHandler> EVICTION_POLICY= new DefaultEvictionPolicy<>() {
@Override
@@ -73,6 +78,8 @@
private final APool2NodeFactory factory;
+ private int clientCheckIntervalMillis;
+
private volatile byte state;
private boolean evicting;
private final Object stateLock= new Object();
@@ -81,6 +88,9 @@
public APool2(final String id, final APool2NodeFactory factory, final PoolConfig config) {
super(factory, createAConfig(config));
this.id= id;
+ { final String s= System.getProperty(CLIENT_CHECK_INTERVAL_MILLIS_PROPERTY_KEY);
+ this.clientCheckIntervalMillis= (s != null) ? Integer.parseInt(s) : 60000;
+ }
factory.setPool(this);
this.factory= factory;
@@ -104,6 +114,10 @@
return super.getMaxIdle();
}
+ public int getClientCheckIntervalMillis() {
+ return this.clientCheckIntervalMillis;
+ }
+
public APool2NodeHandler borrowObject(final String client) throws Exception {
if (this.state != 0) {
@@ -183,10 +197,12 @@
final ImList<APool2NodeHandler> objects= this.factory.getAllObjects();
for (final APool2NodeHandler poolObj : objects) {
- if (poolObj.isEvictRequested(nanos)) {
+ poolObj.checkClientLost(nanos);
+ if (poolObj.isEvictRequested(nanos)
+ && poolObj.getPooledObject().getState() != PooledObjectState.INVALID) {
try {
- evicted++;
invalidateObject(poolObj);
+ evicted++;
}
catch (final Exception e) {}
}
diff --git a/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2NodeHandler.java b/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2NodeHandler.java
index 0a66799..7de6298 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2NodeHandler.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServiPool/org/eclipse/statet/internal/rj/servi/APool2NodeHandler.java
@@ -18,11 +18,13 @@
import java.rmi.RemoteException;
import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.PooledObjectState;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
+import org.eclipse.statet.rj.RjClosedException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.servi.pool.PoolNodeObject;
import org.eclipse.statet.rj.servi.pool.PoolNodeState;
@@ -53,7 +55,10 @@
@Nullable Remote thisRemote;
- private volatile long accessId;
+ private volatile long accessId= -1;
+
+ private int checkInterval;
+ private long checkTimestamp;
public APool2NodeHandler(final APool2 pool) {
@@ -132,11 +137,13 @@
super.bindClient(name, host);
synchronized (this) {
this.accessId= this.p.getBorrowedCount();
+ this.checkInterval= this.pool.getClientCheckIntervalMillis();
+ this.checkTimestamp= System.nanoTime();
}
}
void invalidateClient() {
- this.accessId= -1L;
+ this.accessId= -1;
setClientLabel(null);
}
@@ -155,7 +162,7 @@
void doEvict(final long nanos, final boolean direct) {
synchronized (this.p) {
- if (this.evict == 0 || this.evict > nanos) {
+ if (this.evict == 0 || this.evict - nanos > 0) {
this.evict= safeNanos(nanos);
}
}
@@ -171,23 +178,55 @@
boolean isEvictRequested(final long nanos) {
final long evict= this.evict;
return (evict != 0
- && (nanos == 0 || nanos >= evict) );
+ && (nanos == 0 || nanos - evict >= 0) );
}
long getAccessId() {
final long accessId= this.accessId;
- if (accessId == -1L) {
+ if (accessId == -1) {
throw new IllegalAccessError();
}
return accessId;
}
@Override
+ public int getCheckIntervalMillis() throws RemoteException {
+ return this.checkInterval;
+ }
+
+ @Override
+ public synchronized void check(final long accessId) throws RjException, RemoteException {
+ if (this.accessId != accessId) {
+ throw new RjClosedException("RServi instance is no longer valid.");
+ }
+ this.checkTimestamp= System.nanoTime();
+ }
+
+ void checkClientLost(final long nanos) {
+ if (this.accessId == -1
+ || nanos - this.checkTimestamp < (long)(2.1 * this.checkInterval + 1000) * 1_000_000
+ || getPooledObject().getState() == PooledObjectState.INVALID ) {
+ return;
+ }
+ String clientLabel;
+ synchronized (this) {
+ if (this.accessId == -1
+ || nanos - this.checkTimestamp < (long)(2.1 * this.checkInterval + 1000) * 1_000_000 ) {
+ return;
+ }
+ clientLabel= getClientLabel();
+ doEvict(nanos, false);
+ }
+ Utils.logInfo(String.format("Abandoned RServi instance (client= %1$s) detected and marked for eviction.",
+ (clientLabel != null) ? '\'' + clientLabel + '\'' : "<no label>" ));
+ }
+
+ @Override
public void returnObject(final long accessId) throws RjException {
try {
synchronized (this) {
if (this.accessId != accessId) {
- throw new IllegalStateException("Access id no longer valid.");
+ throw new RjClosedException("RServi instance is no longer valid.");
}
invalidateClient();
}