Bug 567730: [RJ-Servi] Shutdown R engine of nodes gracefully

  - Change default start/stop timeout to 10s

Change-Id: I113adadde6765fa1e174089f6caedff020de3df5
diff --git a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
index bc61636..f7a98ca 100644
--- a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
+++ b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
@@ -36,6 +36,7 @@
 import org.eclipse.statet.jcommons.lang.NonNullByDefault;
 import org.eclipse.statet.jcommons.lang.Nullable;
 
+import org.eclipse.statet.rj.RjClosedException;
 import org.eclipse.statet.rj.RjException;
 import org.eclipse.statet.rj.data.RDataUtils;
 import org.eclipse.statet.rj.data.RObject;
@@ -78,6 +79,17 @@
 	
 	protected static final Logger LOGGER= Logger.getLogger("org.eclipse.statet.rj.server");
 	
+	protected static final class RjExitException extends RjClosedException {
+		
+		private static final long serialVersionUID= 1L;
+		
+		public RjExitException(final String message) {
+			super(message);
+		}
+		
+	}
+	
+	
 	protected final RMIServerControl control;
 	protected SrvEngine srvEngine;
 	
@@ -232,7 +244,9 @@
 			sendCom= this.serverC2SList;
 			
 			WAIT_FOR_ANSWER: while (true) {
+//				System.out.println("\n>> SEND ======" + Thread.currentThread() + "\n" + sendItem + "==\n");
 				final RjsComObject receivedCom= runMainLoop(sendCom, null);
+//				System.out.println("\n<< RECEIVED ==" + Thread.currentThread() + "\n" + receivedCom + "==\n");
 				sendCom= null;
 				
 				COM_TYPE: switch (receivedCom.getComType()) {
@@ -240,7 +254,15 @@
 					sendCom= RjsStatus.OK_STATUS;
 					break COM_TYPE;
 				case RjsComObject.T_STATUS:
-					final RjsStatus status= (RjsStatus) receivedCom;
+					final RjsStatus status= (RjsStatus)receivedCom;
+					if ((status.getCode() & 0xffffff00) == 0) {
+						switch (status.getCode()) {
+						case Server.S_LOST:
+						case Server.S_NOT_STARTED:
+						case Server.S_STOPPED:
+							throw new RjExitException(status.getMessage());
+						}
+					}
 					switch (status.getSeverity()) {
 					case RjsStatus.OK:
 						break COM_TYPE;
@@ -255,13 +277,14 @@
 						break COM_TYPE;
 					}
 				case RjsComObject.T_MAIN_LIST:
-					final MainCmdS2CList list= (MainCmdS2CList) receivedCom;
+					final MainCmdS2CList list= (MainCmdS2CList)receivedCom;
 					MainCmdItem item= list.getItems();
 					while (item != null) {
 						if (item == sendItem) {
 							answer= sendItem;
 							break COM_TYPE;
 						}
+						processServerCmdItem(item);
 						item= item.next;
 					}
 					break COM_TYPE;
@@ -270,7 +293,7 @@
 			this.serverC2SList.clear();
 			if (answer == null || !answer.isOK()) {
 				final RjsStatus status= (answer != null) ? answer.getStatus() : MISSING_ANSWER_STATUS;
-				throw new RjException("R commands failed: "+status.getMessage()+".");
+				throw new RjException("R commands failed: " + status.getMessage() + ".");
 			}
 			return answer.getData();
 		}
@@ -282,6 +305,18 @@
 		}
 	}
 	
+	protected void processServerCmdItem(final MainCmdItem item) {
+		switch (item.getCmdType()) {
+		case MainCmdItem.T_CONSOLE_READ_ITEM:
+			LOGGER.log(Level.INFO, "R-PROMPT: " + item.getDataText());
+			break;
+		case MainCmdItem.T_CONSOLE_WRITE_ITEM:
+			LOGGER.log(Level.INFO, "R-OUT (" + item.getOp() + "): " + item.getDataText());
+			break;
+		}
+	}
+	
+	
 	protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException {
 		return this.srvEngine.runMainLoop(this.serverClient, com);
 	}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
index f4b3820..eb4a2b8 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
@@ -21,6 +21,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.OperationsException;
 import javax.security.auth.login.LoginException;
@@ -137,8 +138,13 @@
 		
 		final var wd= Path.of(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
 		servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
-		final var file= wd.resolve("test.txt");
-		assertTrue(Files.isRegularFile(file));
+		final var wdFile= wd.resolve("test.txt");
+		assertTrue(Files.isRegularFile(wdFile));
+		
+		final var tempDir= Path.of(RDataUtils.checkSingleCharValue(servi1.evalData("tempdir()", m)));
+		servi1.evalVoid("writeLines(\"Hello\", paste(tempdir(), \"test.txt\", sep=\"/\"))", m);
+		final var tempFile= tempDir.resolve("test.txt");
+		assertTrue(Files.isRegularFile(tempFile));
 		
 		closeServi(servi1);
 		
@@ -151,8 +157,41 @@
 			catch (final InterruptedException e) {}
 		}
 		
-		assertTrue(Files.notExists(file));
+		assertTrue(Files.notExists(wdFile));
 		assertTrue(Files.notExists(wd));
+		assertTrue(Files.notExists(tempFile));
+		assertTrue(Files.notExists(tempDir));
+	}
+	
+	@Test
+	public void timeout_stop() throws RjException, OperationsException,
+			NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+		final ProgressMonitor m= new NullProgressMonitor();
+		
+		this.localR.start();
+		
+		final RServi servi1= getServi("test1");
+		
+		final Thread blocker= new Thread() {
+			@Override
+			public void run() {
+				try {
+					servi1.evalVoid("Sys.sleep(20L)", m);
+				}
+				catch (final Exception e) {
+				}
+			}
+		};
+		blocker.start();
+		try {
+			Thread.sleep(500);
+		}
+		catch (final InterruptedException e) {}
+		
+		final long t1= System.nanoTime();
+		this.localR.stop();
+		final long waitTime= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1);
+		assertTrue(waitTime < 11500);
 	}
 	
 }
diff --git a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
index ed59a67..30a7c8c 100644
--- a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
+++ b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
@@ -14,6 +14,7 @@
 Import-Package: org.apache.commons.pool2;version="[2.6.2,3.0.0)";resolution:=optional,
  org.apache.commons.pool2.impl;version="[2.6.2,3.0.0)";resolution:=optional,
  org.eclipse.statet.jcommons.collections;version="4.3.0",
+ org.eclipse.statet.jcommons.concurrent;version="4.3.0",
  org.eclipse.statet.jcommons.io;version="4.3.0",
  org.eclipse.statet.jcommons.lang;version="4.3.0",
  org.eclipse.statet.jcommons.rmi;version="4.3.0",
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index a152644..450732b 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -27,12 +27,15 @@
 import java.nio.file.InvalidPathException;
 import java.nio.file.Path;
 import java.rmi.NotBoundException;
+import java.rmi.UnmarshalException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -40,6 +43,7 @@
 import org.eclipse.statet.jcommons.collections.CollectionUtils;
 import org.eclipse.statet.jcommons.collections.ImCollections;
 import org.eclipse.statet.jcommons.collections.ImList;
+import org.eclipse.statet.jcommons.concurrent.DaemonThreadFactory;
 import org.eclipse.statet.jcommons.io.FileUtils;
 import org.eclipse.statet.jcommons.lang.NonNullByDefault;
 import org.eclipse.statet.jcommons.lang.Nullable;
@@ -102,6 +106,9 @@
 		return args;
 	}
 	
+	private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
+			new DaemonThreadFactory("LocalNodeFactory-Monitor") );
+	
 	
 	private static class ProcessConfig {
 		
@@ -135,7 +142,7 @@
 	
 	private boolean verbose;
 	
-	private long timeoutNanos= TimeUnit.SECONDS.toNanos(30);
+	private long timeoutNanos= TimeUnit.SECONDS.toNanos(10);
 	
 	private final List<String> sslPropertyArgs;
 	
@@ -642,38 +649,46 @@
 	
 	@Override
 	public void stopNode(final NodeHandler handler) {
-		final long t= System.nanoTime();
-		final long timeout= this.timeoutNanos;
-		
-		try {
-			handler.shutdown();
-		}
-		catch (final Throwable e) {
-			logWarning(handler, Messages.ShutdownNode_error_message, e);
-		}
-		
 		final Process process= handler.process;
 		handler.process= null;
-		if (process != null) {
-			for (int i= 0; i < Integer.MAX_VALUE; i++) {
+		
+		class StopMonitor implements Runnable {
+			
+			volatile boolean killed;
+			
+			@Override
+			public synchronized void run() {
 				try {
-					Thread.sleep(250);
-				}
-				catch (final InterruptedException e) {
-				}
-				try {
-					process.exitValue();
-					break;
-				}
-				catch (final IllegalThreadStateException e) {
-					final long diff= System.nanoTime() - t;
-					if (i >= 10 && timeout >= 0 && diff > timeout) {
+					if (process != null && process.isAlive()) {
+						this.killed= true;
 						process.destroy();
 						logWarning(handler, "Killed R node, because it didn't stop regularly.");
-						break;
 					}
-					continue;
 				}
+				catch (final Throwable e) {
+					logError(handler, "A runtime error occurred in StopMonitor.", e);
+				}
+			}
+			
+		}
+		final var monitor= new StopMonitor();
+		MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
+		try {
+			handler.shutdown();
+			
+			if (process != null) {
+				try {
+					process.waitFor();
+				}
+				catch (final InterruptedException e) {}
+			}
+		}
+		catch (final Throwable e) {
+			if (monitor.killed && e instanceof UnmarshalException) {
+				// ignore
+			}
+			else {
+				logWarning(handler, Messages.ShutdownNode_error_message, e);
 			}
 		}
 		
@@ -732,4 +747,9 @@
 		log(handler, Status.WARNING, mainMessage, null, null);
 	}
 	
+	private void logError(final NodeHandler handler, final String mainMessage,
+			final @Nullable Throwable e) {
+		log(handler, Status.ERROR, mainMessage, null, e);
+	}
+	
 }
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
index c50270a..2b97605 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
@@ -110,10 +110,8 @@
 		}
 		CommonsRuntime.getEnvironment().removeStoppingListener(this);
 		if (this.inUse) {
-			returnRServi(this.accessId);
-			if (this.handler == null) {
-				return;
-			}
+			this.inUse= false;
+			this.accessId++;
 		}
 		final ThisNodeHandler handler= nonNullAssert(this.handler);
 		this.handler= null;
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
index 605f7e1..d9faf4a 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/rj/servi/node/RServiNodeConfig.java
@@ -77,7 +77,7 @@
 	 */
 	public static final String STARTSTOP_TIMEOUT__ID= "startstop_timeout.millis";
 	
-	private static final long STARTSTOP_TIMEOUT_DEFAULT= 30 * 1000;
+	private static final long STARTSTOP_TIMEOUT_DEFAULT= 10 * 1000;
 	
 	
 	private @Nullable String rHome;
diff --git a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
index b9951ab..07a4a72 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
@@ -14,7 +14,6 @@
 
 package org.eclipse.statet.internal.rj.servi.server;
 
-import java.io.PrintStream;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Path;
 import java.rmi.ConnectException;
@@ -24,8 +23,6 @@
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.logging.Level;
 
 import javax.security.auth.login.LoginException;
@@ -58,30 +55,30 @@
 public class NodeServer extends SrvEngineServer {
 	
 	
-	class ConsoleDummy extends Thread {
+	class ConsoleMockup extends Thread {
 		
 		private final Client client;
-		private final PrintStream out;
 		
 		private final MainCmdC2SList c2sList= new MainCmdC2SList();
 		
-		ConsoleDummy(final Client client) {
+		private int waitMillis= 5000;
+		
+		ConsoleMockup() {
 			setName("R Console");
 			setDaemon(true);
 			setPriority(NORM_PRIORITY-1);
-			this.client= client;
-			this.out= System.out;
+			this.client= NodeServer.this.consoleMockupClient;
 		}
 		
 		@Override
 		public void run() {
 			try {
 				synchronized (NodeServer.this.srvEngine) {
-					if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) {
+					if (NodeServer.this.isConsoleEnabled || NodeServer.this.consoleMockup != null) {
 						return;
 					}
+					NodeServer.this.consoleMockup= this;
 					NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
-					NodeServer.this.isConsoleDummyRunning= true;
 				}
 				
 				RjsComObject sendCom= null;
@@ -103,25 +100,23 @@
 							case RjsComObject.T_MAIN_LIST:
 								MainCmdItem item= ((MainCmdS2CList) receivedCom).getItems();
 								MainCmdItem tmp;
-								ITER_ITEMS : for (; (item != null); tmp= item, item= item.next, tmp.next= null) {
-									switch (item.getCmdType()) {
-									case MainCmdItem.T_CONSOLE_WRITE_ITEM:
-										this.out.println("R-OUT (" + item.getOp() + "): " + item.getDataText());
-										break;
-									case MainCmdItem.T_CONSOLE_READ_ITEM:
-										this.out.println("R-PROMPT: " + item.getDataText());
-										break;
-									}
+								for (; (item != null); tmp= item, item= item.next, tmp.next= null) {
+									processServerCmdItem(item);
 								}
 								break;
 							case RjsComObject.T_STATUS:
-								switch (((RjsStatus) receivedCom).getCode()) {
+								switch (((RjsStatus)receivedCom).getCode()) {
 								case Server.S_DISCONNECTED:
 									throw new ConnectException("");
 								case Server.S_LOST:
 								case Server.S_NOT_STARTED:
 								case Server.S_STOPPED:
-									return;
+									synchronized (NodeServer.this.srvEngine) {
+										if (NodeServer.this.consoleMockup == this) {
+											NodeServer.this.consoleMockup= null;
+										}
+										return;
+									}
 								}
 							}
 						}
@@ -129,7 +124,9 @@
 					catch (final ConnectException e) {
 						synchronized (NodeServer.this.srvEngine) {
 							if (NodeServer.this.isConsoleEnabled) {
-								NodeServer.this.isConsoleDummyRunning= false;
+								if (NodeServer.this.consoleMockup == this) {
+									NodeServer.this.consoleMockup= null;
+								}
 								return;
 							}
 							NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
@@ -146,20 +143,25 @@
 					}
 					if (sendCom == null) {
 						try {
-							sleep(5000);
+							sleep(this.waitMillis);
 						}
 						catch (final InterruptedException e) {
 						}
 					}
 				}
 			}
-			catch (final Exception e) {
+			catch (final Throwable e) {
 				LOGGER.log(Level.SEVERE,
 						"An error occurred when running dummy R REPL. Stopping REPL.",
 						e );
 			}
 		}
 		
+		public void aboutToStop() {
+			this.waitMillis= 100;
+			interrupt();
+		}
+		
 	}
 	
 	class Node implements RServiNode {
@@ -182,8 +184,8 @@
 					NodeServer.this.authMethod= new NoAuthMethod("<internal>");
 					enabled= NodeServer.this.isConsoleEnabled= false;
 //					LOGGER.fine("before start");
-					if (!NodeServer.this.isConsoleDummyRunning) {
-						new ConsoleDummy(NodeServer.this.consoleDummyClient).start();
+					if (NodeServer.this.consoleMockup == null) {
+						new ConsoleMockup().start();
 					}
 //					LOGGER.fine("after start");
 				}
@@ -278,11 +280,11 @@
 	
 	
 	private boolean isConsoleEnabled;
-	private boolean isConsoleDummyRunning;
 	
 	private final ServerAuthMethod rserviAuthMethod;
 	
-	private final Client consoleDummyClient;
+	private final Client consoleMockupClient;
+	private @Nullable ConsoleMockup consoleMockup;
 	
 	private @Nullable String currentClientLabel;
 	private @Nullable Backend currentClientBackend;
@@ -296,7 +298,7 @@
 	public NodeServer(final RMIServerControl control) {
 		super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$
 		this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$
-		this.consoleDummyClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
+		this.consoleMockupClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
 	}
 	
 	
@@ -322,7 +324,7 @@
 		
 		final Map<String, Object> properties= new HashMap<>();
 		properties.put("args", new String[0]);
-		this.srvEngine.start(this.consoleDummyClient, properties);
+		this.srvEngine.start(this.consoleMockupClient, properties);
 		
 		try {
 			synchronized (this.serviRunLock) {
@@ -419,22 +421,36 @@
 	
 	void shutdown() {
 		this.control.checkCleanup();
-		new Timer(true).schedule(new TimerTask() {
-			@Override
-			public void run() {
-				try {
-					unbindClient();
-				}
-				catch (final Exception e) {
-					LOGGER.log(Level.SEVERE,
-							"An error occurred when unbinding the client for shutdown.",
-							e );
-				}
-				finally {
-					Runtime.getRuntime().exit(0);
-				}
+		
+		try {
+			LOGGER.log(Level.FINE, "Shutting down R node: Unbind client...");
+			unbindClient();
+		}
+		catch (final Exception e) {
+			LOGGER.log(Level.SEVERE,
+					"An error occurred when unbinding the client for shutdown.",
+					e );
+		}
+		
+		final ConsoleMockup consoleMockup= this.consoleMockup;
+		if (consoleMockup != null) {
+			consoleMockup.aboutToStop();
+		}
+		try {
+			LOGGER.log(Level.FINE, "Shutting down R node: Exit R engine...");
+			synchronized (this.serviRunLock) {
+				runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
+						"base::q(\"no\")", null, null, null ));
 			}
-		}, 500L );
+		}
+		catch (final RjExitException e) {
+			// expected
+		}
+		catch (final Exception e) {
+			LOGGER.log(Level.SEVERE,
+					"An error occurred when exiting the R engine for shutdown.",
+					e );
+		}
 	}
 	
 	public void setProperties(final Map<String, ? extends @NonNull Object> properties, final Object caller) throws RemoteException {