Bug 567730: [RJ-Servi] Shutdown R engine of nodes gracefully
- Change default start/stop timeout to 10s
Change-Id: I113adadde6765fa1e174089f6caedff020de3df5
diff --git a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
index bc61636..f7a98ca 100644
--- a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
+++ b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
@@ -36,6 +36,7 @@
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
+import org.eclipse.statet.rj.RjClosedException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.data.RDataUtils;
import org.eclipse.statet.rj.data.RObject;
@@ -78,6 +79,17 @@
protected static final Logger LOGGER= Logger.getLogger("org.eclipse.statet.rj.server");
+ protected static final class RjExitException extends RjClosedException {
+
+ private static final long serialVersionUID= 1L;
+
+ public RjExitException(final String message) {
+ super(message);
+ }
+
+ }
+
+
protected final RMIServerControl control;
protected SrvEngine srvEngine;
@@ -232,7 +244,9 @@
sendCom= this.serverC2SList;
WAIT_FOR_ANSWER: while (true) {
+// System.out.println("\n>> SEND ======" + Thread.currentThread() + "\n" + sendItem + "==\n");
final RjsComObject receivedCom= runMainLoop(sendCom, null);
+// System.out.println("\n<< RECEIVED ==" + Thread.currentThread() + "\n" + receivedCom + "==\n");
sendCom= null;
COM_TYPE: switch (receivedCom.getComType()) {
@@ -240,7 +254,15 @@
sendCom= RjsStatus.OK_STATUS;
break COM_TYPE;
case RjsComObject.T_STATUS:
- final RjsStatus status= (RjsStatus) receivedCom;
+ final RjsStatus status= (RjsStatus)receivedCom;
+ if ((status.getCode() & 0xffffff00) == 0) {
+ switch (status.getCode()) {
+ case Server.S_LOST:
+ case Server.S_NOT_STARTED:
+ case Server.S_STOPPED:
+ throw new RjExitException(status.getMessage());
+ }
+ }
switch (status.getSeverity()) {
case RjsStatus.OK:
break COM_TYPE;
@@ -255,13 +277,14 @@
break COM_TYPE;
}
case RjsComObject.T_MAIN_LIST:
- final MainCmdS2CList list= (MainCmdS2CList) receivedCom;
+ final MainCmdS2CList list= (MainCmdS2CList)receivedCom;
MainCmdItem item= list.getItems();
while (item != null) {
if (item == sendItem) {
answer= sendItem;
break COM_TYPE;
}
+ processServerCmdItem(item);
item= item.next;
}
break COM_TYPE;
@@ -270,7 +293,7 @@
this.serverC2SList.clear();
if (answer == null || !answer.isOK()) {
final RjsStatus status= (answer != null) ? answer.getStatus() : MISSING_ANSWER_STATUS;
- throw new RjException("R commands failed: "+status.getMessage()+".");
+ throw new RjException("R commands failed: " + status.getMessage() + ".");
}
return answer.getData();
}
@@ -282,6 +305,18 @@
}
}
+ protected void processServerCmdItem(final MainCmdItem item) {
+ switch (item.getCmdType()) {
+ case MainCmdItem.T_CONSOLE_READ_ITEM:
+ LOGGER.log(Level.INFO, "R-PROMPT: " + item.getDataText());
+ break;
+ case MainCmdItem.T_CONSOLE_WRITE_ITEM:
+ LOGGER.log(Level.INFO, "R-OUT (" + item.getOp() + "): " + item.getDataText());
+ break;
+ }
+ }
+
+
protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException {
return this.srvEngine.runMainLoop(this.serverClient, com);
}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
index f4b3820..eb4a2b8 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
@@ -21,6 +21,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
import javax.management.OperationsException;
import javax.security.auth.login.LoginException;
@@ -137,8 +138,13 @@
final var wd= Path.of(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
- final var file= wd.resolve("test.txt");
- assertTrue(Files.isRegularFile(file));
+ final var wdFile= wd.resolve("test.txt");
+ assertTrue(Files.isRegularFile(wdFile));
+
+ final var tempDir= Path.of(RDataUtils.checkSingleCharValue(servi1.evalData("tempdir()", m)));
+ servi1.evalVoid("writeLines(\"Hello\", paste(tempdir(), \"test.txt\", sep=\"/\"))", m);
+ final var tempFile= tempDir.resolve("test.txt");
+ assertTrue(Files.isRegularFile(tempFile));
closeServi(servi1);
@@ -151,8 +157,41 @@
catch (final InterruptedException e) {}
}
- assertTrue(Files.notExists(file));
+ assertTrue(Files.notExists(wdFile));
assertTrue(Files.notExists(wd));
+ assertTrue(Files.notExists(tempFile));
+ assertTrue(Files.notExists(tempDir));
+ }
+
+ @Test
+ public void timeout_stop() throws RjException, OperationsException,
+ NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+ final ProgressMonitor m= new NullProgressMonitor();
+
+ this.localR.start();
+
+ final RServi servi1= getServi("test1");
+
+ final Thread blocker= new Thread() {
+ @Override
+ public void run() {
+ try {
+ servi1.evalVoid("Sys.sleep(20L)", m);
+ }
+ catch (final Exception e) {
+ }
+ }
+ };
+ blocker.start();
+ try {
+ Thread.sleep(500);
+ }
+ catch (final InterruptedException e) {}
+
+ final long t1= System.nanoTime();
+ this.localR.stop();
+ final long waitTime= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1);
+ assertTrue(waitTime < 11500);
}
}
diff --git a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
index ed59a67..30a7c8c 100644
--- a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
+++ b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
@@ -14,6 +14,7 @@
Import-Package: org.apache.commons.pool2;version="[2.6.2,3.0.0)";resolution:=optional,
org.apache.commons.pool2.impl;version="[2.6.2,3.0.0)";resolution:=optional,
org.eclipse.statet.jcommons.collections;version="4.3.0",
+ org.eclipse.statet.jcommons.concurrent;version="4.3.0",
org.eclipse.statet.jcommons.io;version="4.3.0",
org.eclipse.statet.jcommons.lang;version="4.3.0",
org.eclipse.statet.jcommons.rmi;version="4.3.0",
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index a152644..450732b 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -27,12 +27,15 @@
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.rmi.NotBoundException;
+import java.rmi.UnmarshalException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -40,6 +43,7 @@
import org.eclipse.statet.jcommons.collections.CollectionUtils;
import org.eclipse.statet.jcommons.collections.ImCollections;
import org.eclipse.statet.jcommons.collections.ImList;
+import org.eclipse.statet.jcommons.concurrent.DaemonThreadFactory;
import org.eclipse.statet.jcommons.io.FileUtils;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
@@ -102,6 +106,9 @@
return args;
}
+ private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
+ new DaemonThreadFactory("LocalNodeFactory-Monitor") );
+
private static class ProcessConfig {
@@ -135,7 +142,7 @@
private boolean verbose;
- private long timeoutNanos= TimeUnit.SECONDS.toNanos(30);
+ private long timeoutNanos= TimeUnit.SECONDS.toNanos(10);
private final List<String> sslPropertyArgs;
@@ -642,38 +649,46 @@
@Override
public void stopNode(final NodeHandler handler) {
- final long t= System.nanoTime();
- final long timeout= this.timeoutNanos;
-
- try {
- handler.shutdown();
- }
- catch (final Throwable e) {
- logWarning(handler, Messages.ShutdownNode_error_message, e);
- }
-
final Process process= handler.process;
handler.process= null;
- if (process != null) {
- for (int i= 0; i < Integer.MAX_VALUE; i++) {
+
+ class StopMonitor implements Runnable {
+
+ volatile boolean killed;
+
+ @Override
+ public synchronized void run() {
try {
- Thread.sleep(250);
- }
- catch (final InterruptedException e) {
- }
- try {
- process.exitValue();
- break;
- }
- catch (final IllegalThreadStateException e) {
- final long diff= System.nanoTime() - t;
- if (i >= 10 && timeout >= 0 && diff > timeout) {
+ if (process != null && process.isAlive()) {
+ this.killed= true;
process.destroy();
logWarning(handler, "Killed R node, because it didn't stop regularly.");
- break;
}
- continue;
}
+ catch (final Throwable e) {
+ logError(handler, "A runtime error occurred in StopMonitor.", e);
+ }
+ }
+
+ }
+ final var monitor= new StopMonitor();
+ MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
+ try {
+ handler.shutdown();
+
+ if (process != null) {
+ try {
+ process.waitFor();
+ }
+ catch (final InterruptedException e) {}
+ }
+ }
+ catch (final Throwable e) {
+ if (monitor.killed && e instanceof UnmarshalException) {
+ // ignore
+ }
+ else {
+ logWarning(handler, Messages.ShutdownNode_error_message, e);
}
}
@@ -732,4 +747,9 @@
log(handler, Status.WARNING, mainMessage, null, null);
}
+ private void logError(final NodeHandler handler, final String mainMessage,
+ final @Nullable Throwable e) {
+ log(handler, Status.ERROR, mainMessage, null, e);
+ }
+
}
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
index c50270a..2b97605 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
@@ -110,10 +110,8 @@
}
CommonsRuntime.getEnvironment().removeStoppingListener(this);
if (this.inUse) {
- returnRServi(this.accessId);
- if (this.handler == null) {
- return;
- }
+ this.inUse= false;
+ this.accessId++;
}
final ThisNodeHandler handler= nonNullAssert(this.handler);
this.handler= null;
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
index 605f7e1..d9faf4a 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
@@ -77,7 +77,7 @@
*/
public static final String STARTSTOP_TIMEOUT__ID= "startstop_timeout.millis";
- private static final long STARTSTOP_TIMEOUT_DEFAULT= 30 * 1000;
+ private static final long STARTSTOP_TIMEOUT_DEFAULT= 10 * 1000;
private @Nullable String rHome;
diff --git a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
index b9951ab..07a4a72 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
@@ -14,7 +14,6 @@
package org.eclipse.statet.internal.rj.servi.server;
-import java.io.PrintStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Path;
import java.rmi.ConnectException;
@@ -24,8 +23,6 @@
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.logging.Level;
import javax.security.auth.login.LoginException;
@@ -58,30 +55,30 @@
public class NodeServer extends SrvEngineServer {
- class ConsoleDummy extends Thread {
+ class ConsoleMockup extends Thread {
private final Client client;
- private final PrintStream out;
private final MainCmdC2SList c2sList= new MainCmdC2SList();
- ConsoleDummy(final Client client) {
+ private int waitMillis= 5000;
+
+ ConsoleMockup() {
setName("R Console");
setDaemon(true);
setPriority(NORM_PRIORITY-1);
- this.client= client;
- this.out= System.out;
+ this.client= NodeServer.this.consoleMockupClient;
}
@Override
public void run() {
try {
synchronized (NodeServer.this.srvEngine) {
- if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) {
+ if (NodeServer.this.isConsoleEnabled || NodeServer.this.consoleMockup != null) {
return;
}
+ NodeServer.this.consoleMockup= this;
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
- NodeServer.this.isConsoleDummyRunning= true;
}
RjsComObject sendCom= null;
@@ -103,25 +100,23 @@
case RjsComObject.T_MAIN_LIST:
MainCmdItem item= ((MainCmdS2CList) receivedCom).getItems();
MainCmdItem tmp;
- ITER_ITEMS : for (; (item != null); tmp= item, item= item.next, tmp.next= null) {
- switch (item.getCmdType()) {
- case MainCmdItem.T_CONSOLE_WRITE_ITEM:
- this.out.println("R-OUT (" + item.getOp() + "): " + item.getDataText());
- break;
- case MainCmdItem.T_CONSOLE_READ_ITEM:
- this.out.println("R-PROMPT: " + item.getDataText());
- break;
- }
+ for (; (item != null); tmp= item, item= item.next, tmp.next= null) {
+ processServerCmdItem(item);
}
break;
case RjsComObject.T_STATUS:
- switch (((RjsStatus) receivedCom).getCode()) {
+ switch (((RjsStatus)receivedCom).getCode()) {
case Server.S_DISCONNECTED:
throw new ConnectException("");
case Server.S_LOST:
case Server.S_NOT_STARTED:
case Server.S_STOPPED:
- return;
+ synchronized (NodeServer.this.srvEngine) {
+ if (NodeServer.this.consoleMockup == this) {
+ NodeServer.this.consoleMockup= null;
+ }
+ return;
+ }
}
}
}
@@ -129,7 +124,9 @@
catch (final ConnectException e) {
synchronized (NodeServer.this.srvEngine) {
if (NodeServer.this.isConsoleEnabled) {
- NodeServer.this.isConsoleDummyRunning= false;
+ if (NodeServer.this.consoleMockup == this) {
+ NodeServer.this.consoleMockup= null;
+ }
return;
}
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
@@ -146,20 +143,25 @@
}
if (sendCom == null) {
try {
- sleep(5000);
+ sleep(this.waitMillis);
}
catch (final InterruptedException e) {
}
}
}
}
- catch (final Exception e) {
+ catch (final Throwable e) {
LOGGER.log(Level.SEVERE,
"An error occurred when running dummy R REPL. Stopping REPL.",
e );
}
}
+ public void aboutToStop() {
+ this.waitMillis= 100;
+ interrupt();
+ }
+
}
class Node implements RServiNode {
@@ -182,8 +184,8 @@
NodeServer.this.authMethod= new NoAuthMethod("<internal>");
enabled= NodeServer.this.isConsoleEnabled= false;
// LOGGER.fine("before start");
- if (!NodeServer.this.isConsoleDummyRunning) {
- new ConsoleDummy(NodeServer.this.consoleDummyClient).start();
+ if (NodeServer.this.consoleMockup == null) {
+ new ConsoleMockup().start();
}
// LOGGER.fine("after start");
}
@@ -278,11 +280,11 @@
private boolean isConsoleEnabled;
- private boolean isConsoleDummyRunning;
private final ServerAuthMethod rserviAuthMethod;
- private final Client consoleDummyClient;
+ private final Client consoleMockupClient;
+ private @Nullable ConsoleMockup consoleMockup;
private @Nullable String currentClientLabel;
private @Nullable Backend currentClientBackend;
@@ -296,7 +298,7 @@
public NodeServer(final RMIServerControl control) {
super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$
this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$
- this.consoleDummyClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
+ this.consoleMockupClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
}
@@ -322,7 +324,7 @@
final Map<String, Object> properties= new HashMap<>();
properties.put("args", new String[0]);
- this.srvEngine.start(this.consoleDummyClient, properties);
+ this.srvEngine.start(this.consoleMockupClient, properties);
try {
synchronized (this.serviRunLock) {
@@ -419,22 +421,36 @@
void shutdown() {
this.control.checkCleanup();
- new Timer(true).schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- unbindClient();
- }
- catch (final Exception e) {
- LOGGER.log(Level.SEVERE,
- "An error occurred when unbinding the client for shutdown.",
- e );
- }
- finally {
- Runtime.getRuntime().exit(0);
- }
+
+ try {
+ LOGGER.log(Level.FINE, "Shutting down R node: Unbind client...");
+ unbindClient();
+ }
+ catch (final Exception e) {
+ LOGGER.log(Level.SEVERE,
+ "An error occurred when unbinding the client for shutdown.",
+ e );
+ }
+
+ final ConsoleMockup consoleMockup= this.consoleMockup;
+ if (consoleMockup != null) {
+ consoleMockup.aboutToStop();
+ }
+ try {
+ LOGGER.log(Level.FINE, "Shutting down R node: Exit R engine...");
+ synchronized (this.serviRunLock) {
+ runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
+ "base::q(\"no\")", null, null, null ));
}
- }, 500L );
+ }
+ catch (final RjExitException e) {
+ // expected
+ }
+ catch (final Exception e) {
+ LOGGER.log(Level.SEVERE,
+ "An error occurred when exiting the R engine for shutdown.",
+ e );
+ }
}
public void setProperties(final Map<String, ? extends @NonNull Object> properties, final Object caller) throws RemoteException {