Bug 569493: [RJ-Servi] Shutdown R engine of nodes gracefully
Backport-of: 731203f5f8dc8e93aa2faa07c4f8e7c1761ebcc5
Change-Id: Ia02cee9ce2c13fb02f7702bb8345074cf6707cde
diff --git a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
index 514b95d..9fc0199 100644
--- a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
+++ b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
@@ -27,6 +27,7 @@
import javax.security.auth.login.LoginException;
+import org.eclipse.statet.rj.RjClosedException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.data.RObject;
import org.eclipse.statet.rj.server.DataCmdItem;
@@ -68,6 +69,17 @@
protected static final Logger LOGGER= Logger.getLogger("org.eclipse.statet.rj.server");
+ protected static final class RjExitException extends RjClosedException {
+
+ private static final long serialVersionUID= 1L;
+
+ public RjExitException(final String message) {
+ super(message);
+ }
+
+ }
+
+
protected final RMIServerControl control;
protected SrvEngine srvEngine;
@@ -226,7 +238,9 @@
sendCom= this.serverC2SList;
WAIT_FOR_ANSWER: while (true) {
+// System.out.println("\n>> SEND ======" + Thread.currentThread() + "\n" + sendItem + "==\n");
final RjsComObject receivedCom= runMainLoop(sendCom, null);
+// System.out.println("\n<< RECEIVED ==" + Thread.currentThread() + "\n" + receivedCom + "==\n");
sendCom= null;
COM_TYPE: switch (receivedCom.getComType()) {
@@ -234,7 +248,15 @@
sendCom= RjsStatus.OK_STATUS;
break COM_TYPE;
case RjsComObject.T_STATUS:
- final RjsStatus status= (RjsStatus) receivedCom;
+ final RjsStatus status= (RjsStatus)receivedCom;
+ if ((status.getCode() & 0xffffff00) == 0) {
+ switch (status.getCode()) {
+ case Server.S_LOST:
+ case Server.S_NOT_STARTED:
+ case Server.S_STOPPED:
+ throw new RjExitException(status.getMessage());
+ }
+ }
switch (status.getSeverity()) {
case RjsStatus.OK:
break COM_TYPE;
@@ -256,6 +278,7 @@
answer= sendItem;
break COM_TYPE;
}
+ processServerCmdItem(item);
item= item.next;
}
break COM_TYPE;
@@ -276,6 +299,18 @@
}
}
+ protected void processServerCmdItem(final MainCmdItem item) {
+ switch (item.getCmdType()) {
+ case MainCmdItem.T_CONSOLE_READ_ITEM:
+ LOGGER.log(Level.INFO, "R-PROMPT: " + item.getDataText());
+ break;
+ case MainCmdItem.T_CONSOLE_WRITE_ITEM:
+ LOGGER.log(Level.INFO, "R-OUT (" + item.getOp() + "): " + item.getDataText());
+ break;
+ }
+ }
+
+
protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException {
return this.srvEngine.runMainLoop(this.serverClient, com);
}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
index 3b6a144..45886c5 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
@@ -135,7 +135,9 @@
try {
for (final RServi servi : this.servis) {
try {
- servi.close();
+ if (!servi.isClosed()) {
+ servi.close();
+ }
}
catch (final Throwable e) {
exceptions.add(e);
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
index 508c784..b5eecfb 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
@@ -17,9 +17,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
import javax.management.OperationsException;
import javax.security.auth.login.LoginException;
@@ -32,6 +36,7 @@
import org.eclipse.statet.jcommons.rmi.RMIRegistry;
import org.eclipse.statet.jcommons.rmi.RMIRegistryManager;
import org.eclipse.statet.jcommons.status.NullProgressMonitor;
+import org.eclipse.statet.jcommons.status.ProgressMonitor;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
@@ -154,4 +159,95 @@
closeServi(servi2);
}
+ @Test
+ public void cleanup_WorkingDir() throws RjException, OperationsException,
+ NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+ final ProgressMonitor m= new NullProgressMonitor();
+
+ this.localR.start();
+
+ final RServi servi1= getServi("test1");
+
+ final Path wd= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
+ servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
+ final Path file= wd.resolve("test.txt");
+ assertTrue(Files.isRegularFile(file));
+
+ closeServi(servi1);
+
+ final RServi servi2= getServi("test1");
+ assertTrue(Files.notExists(file));
+
+ closeServi(servi2);
+ }
+
+ @Test
+ public void cleanup_afterStop() throws RjException, OperationsException,
+ NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+ final ProgressMonitor m= new NullProgressMonitor();
+
+ this.localR.start();
+
+ final RServi servi1= getServi("test1");
+
+ final Path wd= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
+ servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
+ final Path wdFile= wd.resolve("test.txt");
+ assertTrue(Files.isRegularFile(wdFile));
+
+ final Path tempDir= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("tempdir()", m)));
+ servi1.evalVoid("writeLines(\"Hello\", paste(tempdir(), \"test.txt\", sep=\"/\"))", m);
+ final Path tempFile= tempDir.resolve("test.txt");
+ assertTrue(Files.isRegularFile(tempFile));
+
+ closeServi(servi1);
+
+ this.localR.stop();
+
+ for (int i= 0; i < 5 && !Files.notExists(wd); i++) {
+ try {
+ Thread.sleep(200);
+ }
+ catch (final InterruptedException e) {}
+ }
+
+ assertTrue(Files.notExists(wdFile));
+ assertTrue(Files.notExists(wd));
+ assertTrue(Files.notExists(tempFile));
+ assertTrue(Files.notExists(tempDir));
+ }
+
+ @Test
+ public void timeout_stop() throws RjException, OperationsException,
+ NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+ final ProgressMonitor m= new NullProgressMonitor();
+
+ this.localR.start();
+
+ final RServi servi1= getServi("test1");
+
+ final Thread blocker= new Thread() {
+ @Override
+ public void run() {
+ try {
+ servi1.evalVoid("Sys.sleep(20L)", m);
+ }
+ catch (final Exception e) {
+ }
+ }
+ };
+ blocker.start();
+ try {
+ Thread.sleep(500);
+ }
+ catch (final InterruptedException e) {}
+
+ final long t1= System.nanoTime();
+ this.localR.stop();
+ final long d1= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1);
+ final long dMax= new RServiNodeConfig().getStartStopTimeout() + 1500;
+ assertTrue(d1 < new RServiNodeConfig().getStartStopTimeout() + 1500,
+ () -> String.format("duration expected: < %1$sms, actual: %2$sms", dMax, d1) );
+ }
+
}
diff --git a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
index 7579ac3..18d4d06 100644
--- a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
+++ b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
@@ -13,6 +13,7 @@
Import-Package: org.apache.commons.pool2;version="[2.6.2,3.0.0)";resolution:=optional,
org.apache.commons.pool2.impl;version="[2.6.2,3.0.0)";resolution:=optional,
org.eclipse.statet.jcommons.collections;version="4.2.0",
+ org.eclipse.statet.jcommons.concurrent;version="4.2.0",
org.eclipse.statet.jcommons.lang;version="4.2.0",
org.eclipse.statet.jcommons.rmi;version="4.2.0",
org.eclipse.statet.jcommons.runtime;version="4.2.0",
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index 09cead0..5dcac0d 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -27,18 +27,22 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.rmi.NotBoundException;
+import java.rmi.UnmarshalException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.eclipse.statet.jcommons.collections.CollectionUtils;
import org.eclipse.statet.jcommons.collections.ImCollections;
import org.eclipse.statet.jcommons.collections.ImList;
+import org.eclipse.statet.jcommons.concurrent.CommonThreadFactory;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
import org.eclipse.statet.jcommons.lang.SystemUtils;
@@ -95,6 +99,15 @@
return args;
}
+ private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
+ new CommonThreadFactory("LocalNodeFactory-Monitor") {
+ @Override
+ protected boolean getDaemon() {
+ return true;
+ }
+ });
+
+
private static class ProcessConfig {
final Map<String, String> addEnv= new HashMap<>();
final List<String> command= new ArrayList<>();
@@ -119,7 +132,7 @@
private boolean verbose;
- private long timeoutNanos= TimeUnit.SECONDS.toNanos(30);
+ private long timeoutNanos= TimeUnit.SECONDS.toNanos(10);
private final List<String> sslPropertyArgs;
@@ -629,39 +642,46 @@
@Override
public void stopNode(final NodeHandler handler) {
- final long t= System.nanoTime();
- final long timeout= this.timeoutNanos;
-
- try {
- handler.shutdown();
- }
- catch (final Throwable e) {
- Utils.logWarning(Messages.ShutdownNode_error_message, e);
- }
-
final Process process= handler.process;
handler.process= null;
- if (process != null) {
- for (int i= 0; i < Integer.MAX_VALUE; i++) {
+
+ class StopMonitor implements Runnable {
+
+ volatile boolean killed;
+
+ @Override
+ public synchronized void run() {
try {
- Thread.sleep(250);
- }
- catch (final InterruptedException e) {
- }
- try {
- process.exitValue();
- break;
- }
- catch (final IllegalThreadStateException e) {
- final long diff= System.nanoTime() - t;
- if (i >= 10 && timeout >= 0 && diff > timeout) {
+ if (process != null && process.isAlive()) {
+ this.killed= true;
process.destroy();
- Utils.logWarning(String.format("Killed RServi node '%1$s'.",
- (handler.address != null) ? handler.address.getName() : "<address not available>" ));
- break;
+ Utils.logWarning("Killed R node, because it didn't stop regularly.");
}
- continue;
}
+ catch (final Throwable e) {
+ Utils.logError("A runtime error occurred in StopMonitor.", e);
+ }
+ }
+
+ }
+ final StopMonitor monitor= new StopMonitor();
+ MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
+ try {
+ handler.shutdown();
+
+ if (process != null) {
+ try {
+ process.waitFor();
+ }
+ catch (final InterruptedException e) {}
+ }
+ }
+ catch (final Throwable e) {
+ if (monitor.killed && e instanceof UnmarshalException) {
+ // ignore
+ }
+ else {
+ Utils.logWarning(Messages.ShutdownNode_error_message, e);
}
}
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
index 433bb10..1c478f4 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
@@ -110,10 +110,8 @@
}
CommonsRuntime.getEnvironment().removeStoppingListener(this);
if (this.inUse) {
- returnRServi(this.accessId);
- if (this.handler == null) {
- return;
- }
+ this.inUse= false;
+ this.accessId++;
}
final ThisNodeHandler handler= nonNullAssert(this.handler);
this.handler= null;
diff --git a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
index 0047f49..b2173d6 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
@@ -23,12 +23,12 @@
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.logging.Level;
import javax.security.auth.login.LoginException;
+import org.eclipse.statet.jcommons.lang.Nullable;
+
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.server.DataCmdItem;
import org.eclipse.statet.rj.server.MainCmdC2SList;
@@ -52,18 +52,20 @@
public class NodeServer extends SrvEngineServer {
- class ConsoleDummy extends Thread {
+ class ConsoleMockup extends Thread {
private final Client client;
private final PrintStream out;
private final MainCmdC2SList c2sList= new MainCmdC2SList();
- ConsoleDummy(final Client client) {
+ private int waitMillis= 5000;
+
+ ConsoleMockup() {
setName("R Console");
setDaemon(true);
setPriority(NORM_PRIORITY-1);
- this.client= client;
+ this.client= NodeServer.this.consoleMockupClient;
this.out= System.out;
}
@@ -71,11 +73,11 @@
public void run() {
try {
synchronized (NodeServer.this.srvEngine) {
- if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) {
+ if (NodeServer.this.isConsoleEnabled || NodeServer.this.consoleMockup != null) {
return;
}
+ NodeServer.this.consoleMockup= this;
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
- NodeServer.this.isConsoleDummyRunning= true;
}
RjsComObject sendCom= null;
@@ -109,13 +111,18 @@
}
break;
case RjsComObject.T_STATUS:
- switch (((RjsStatus) receivedCom).getCode()) {
+ switch (((RjsStatus)receivedCom).getCode()) {
case Server.S_DISCONNECTED:
throw new ConnectException("");
case Server.S_LOST:
case Server.S_NOT_STARTED:
case Server.S_STOPPED:
- return;
+ synchronized (NodeServer.this.srvEngine) {
+ if (NodeServer.this.consoleMockup == this) {
+ NodeServer.this.consoleMockup= null;
+ }
+ return;
+ }
}
}
}
@@ -123,7 +130,9 @@
catch (final ConnectException e) {
synchronized (NodeServer.this.srvEngine) {
if (NodeServer.this.isConsoleEnabled) {
- NodeServer.this.isConsoleDummyRunning= false;
+ if (NodeServer.this.consoleMockup == this) {
+ NodeServer.this.consoleMockup= null;
+ }
return;
}
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
@@ -138,7 +147,7 @@
}
if (sendCom == null) {
try {
- sleep(5000);
+ sleep(this.waitMillis);
}
catch (final InterruptedException e) {
}
@@ -150,6 +159,11 @@
}
}
+ public void aboutToStop() {
+ this.waitMillis= 100;
+ interrupt();
+ }
+
}
class Node implements RServiNode {
@@ -172,8 +186,8 @@
NodeServer.this.authMethod= new NoAuthMethod("<internal>");
enabled= NodeServer.this.isConsoleEnabled= false;
// LOGGER.fine("before start");
- if (!NodeServer.this.isConsoleDummyRunning) {
- new ConsoleDummy(NodeServer.this.consoleDummyClient).start();
+ if (NodeServer.this.consoleMockup == null) {
+ new ConsoleMockup().start();
}
// LOGGER.fine("after start");
}
@@ -268,11 +282,11 @@
private boolean isConsoleEnabled;
- private boolean isConsoleDummyRunning;
private final ServerAuthMethod rserviAuthMethod;
- private final Client consoleDummyClient;
+ private final Client consoleMockupClient;
+ private @Nullable ConsoleMockup consoleMockup;
private String currentClientLabel;
private Backend currentClientBackend;
@@ -286,7 +300,7 @@
public NodeServer(final RMIServerControl control) {
super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$
this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$
- this.consoleDummyClient= new Client("-", "dummy", (byte) 0); //$NON-NLS-1$
+ this.consoleMockupClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
}
@@ -312,7 +326,7 @@
final Map<String, Object> properties= new HashMap<>();
properties.put("args", new String[0]);
- this.srvEngine.start(this.consoleDummyClient, properties);
+ this.srvEngine.start(this.consoleMockupClient, properties);
try {
synchronized (this.serviRunLock) {
@@ -403,20 +417,36 @@
void shutdown() {
this.control.checkCleanup();
- new Timer(true).schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- unbindClient();
- }
- catch (final Exception e) {
- e.printStackTrace();
- }
- finally {
- System.exit(0);
- }
+
+ try {
+ LOGGER.log(Level.FINE, "Shutting down R node: Unbind client...");
+ unbindClient();
+ }
+ catch (final Exception e) {
+ LOGGER.log(Level.SEVERE,
+ "An error occurred when unbinding the client for shutdown.",
+ e );
+ }
+
+ final ConsoleMockup consoleMockup= this.consoleMockup;
+ if (consoleMockup != null) {
+ consoleMockup.aboutToStop();
+ }
+ try {
+ LOGGER.log(Level.FINE, "Shutting down R node: Exit R engine...");
+ synchronized (this.serviRunLock) {
+ runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
+ "base::q(\"no\")", null, null, null ));
}
- }, 500L);
+ }
+ catch (final RjExitException e) {
+ // expected
+ }
+ catch (final Exception e) {
+ LOGGER.log(Level.SEVERE,
+ "An error occurred when exiting the R engine for shutdown.",
+ e );
+ }
}
public void setProperties(final Map<String, ? extends Object> properties, final Object caller) throws RemoteException {