Bug 569493: [RJ-Servi] Shutdown R engine of nodes gracefully

Backport-of: 731203f5f8dc8e93aa2faa07c4f8e7c1761ebcc5
Change-Id: Ia02cee9ce2c13fb02f7702bb8345074cf6707cde
diff --git a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
index 514b95d..9fc0199 100644
--- a/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
+++ b/core/org.eclipse.statet.rj.server/src/org/eclipse/statet/rj/server/srv/engine/SrvEngineServer.java
@@ -27,6 +27,7 @@
 
 import javax.security.auth.login.LoginException;
 
+import org.eclipse.statet.rj.RjClosedException;
 import org.eclipse.statet.rj.RjException;
 import org.eclipse.statet.rj.data.RObject;
 import org.eclipse.statet.rj.server.DataCmdItem;
@@ -68,6 +69,17 @@
 	
 	protected static final Logger LOGGER= Logger.getLogger("org.eclipse.statet.rj.server");
 	
+	protected static final class RjExitException extends RjClosedException {
+		
+		private static final long serialVersionUID= 1L;
+		
+		public RjExitException(final String message) {
+			super(message);
+		}
+		
+	}
+	
+	
 	protected final RMIServerControl control;
 	protected SrvEngine srvEngine;
 	
@@ -226,7 +238,9 @@
 			sendCom= this.serverC2SList;
 			
 			WAIT_FOR_ANSWER: while (true) {
+//				System.out.println("\n>> SEND ======" + Thread.currentThread() + "\n" + sendItem + "==\n");
 				final RjsComObject receivedCom= runMainLoop(sendCom, null);
+//				System.out.println("\n<< RECEIVED ==" + Thread.currentThread() + "\n" + receivedCom + "==\n");
 				sendCom= null;
 				
 				COM_TYPE: switch (receivedCom.getComType()) {
@@ -234,7 +248,15 @@
 					sendCom= RjsStatus.OK_STATUS;
 					break COM_TYPE;
 				case RjsComObject.T_STATUS:
-					final RjsStatus status= (RjsStatus) receivedCom;
+					final RjsStatus status= (RjsStatus)receivedCom;
+					if ((status.getCode() & 0xffffff00) == 0) {
+						switch (status.getCode()) {
+						case Server.S_LOST:
+						case Server.S_NOT_STARTED:
+						case Server.S_STOPPED:
+							throw new RjExitException(status.getMessage());
+						}
+					}
 					switch (status.getSeverity()) {
 					case RjsStatus.OK:
 						break COM_TYPE;
@@ -256,6 +278,7 @@
 							answer= sendItem;
 							break COM_TYPE;
 						}
+						processServerCmdItem(item);
 						item= item.next;
 					}
 					break COM_TYPE;
@@ -276,6 +299,18 @@
 		}
 	}
 	
+	protected void processServerCmdItem(final MainCmdItem item) {
+		switch (item.getCmdType()) {
+		case MainCmdItem.T_CONSOLE_READ_ITEM:
+			LOGGER.log(Level.INFO, "R-PROMPT: " + item.getDataText());
+			break;
+		case MainCmdItem.T_CONSOLE_WRITE_ITEM:
+			LOGGER.log(Level.INFO, "R-OUT (" + item.getOp() + "): " + item.getDataText());
+			break;
+		}
+	}
+	
+	
 	protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException {
 		return this.srvEngine.runMainLoop(this.serverClient, com);
 	}
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
index 3b6a144..45886c5 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/AbstractServiTest.java
@@ -135,7 +135,9 @@
 		try {
 			for (final RServi servi : this.servis) {
 				try {
-					servi.close();
+					if (!servi.isClosed()) {
+						servi.close();
+					}
 				}
 				catch (final Throwable e) {
 					exceptions.add(e);
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
index 508c784..b5eecfb 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/LocalNodeTest.java
@@ -17,9 +17,13 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.OperationsException;
 import javax.security.auth.login.LoginException;
@@ -32,6 +36,7 @@
 import org.eclipse.statet.jcommons.rmi.RMIRegistry;
 import org.eclipse.statet.jcommons.rmi.RMIRegistryManager;
 import org.eclipse.statet.jcommons.status.NullProgressMonitor;
+import org.eclipse.statet.jcommons.status.ProgressMonitor;
 import org.eclipse.statet.jcommons.status.Status;
 import org.eclipse.statet.jcommons.status.StatusException;
 
@@ -154,4 +159,95 @@
 		closeServi(servi2);
 	}
 	
+	@Test
+	public void cleanup_WorkingDir() throws RjException, OperationsException,
+	NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+		final ProgressMonitor m= new NullProgressMonitor();
+		
+		this.localR.start();
+		
+		final RServi servi1= getServi("test1");
+		
+		final Path wd= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
+		servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
+		final Path file= wd.resolve("test.txt");
+		assertTrue(Files.isRegularFile(file));
+		
+		closeServi(servi1);
+		
+		final RServi servi2= getServi("test1");
+		assertTrue(Files.notExists(file));
+		
+		closeServi(servi2);
+	}
+	
+	@Test
+	public void cleanup_afterStop() throws RjException, OperationsException,
+	NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+		final ProgressMonitor m= new NullProgressMonitor();
+		
+		this.localR.start();
+		
+		final RServi servi1= getServi("test1");
+		
+		final Path wd= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("getwd()", m)));
+		servi1.evalVoid("writeLines(\"Hello\", \"test.txt\")", m);
+		final Path wdFile= wd.resolve("test.txt");
+		assertTrue(Files.isRegularFile(wdFile));
+		
+		final Path tempDir= Paths.get(RDataUtils.checkSingleCharValue(servi1.evalData("tempdir()", m)));
+		servi1.evalVoid("writeLines(\"Hello\", paste(tempdir(), \"test.txt\", sep=\"/\"))", m);
+		final Path tempFile= tempDir.resolve("test.txt");
+		assertTrue(Files.isRegularFile(tempFile));
+		
+		closeServi(servi1);
+		
+		this.localR.stop();
+		
+		for (int i= 0; i < 5 && !Files.notExists(wd); i++) {
+			try {
+				Thread.sleep(200);
+			}
+			catch (final InterruptedException e) {}
+		}
+		
+		assertTrue(Files.notExists(wdFile));
+		assertTrue(Files.notExists(wd));
+		assertTrue(Files.notExists(tempFile));
+		assertTrue(Files.notExists(tempDir));
+	}
+	
+	@Test
+	public void timeout_stop() throws RjException, OperationsException,
+			NoSuchElementException, LoginException, StatusException, UnexpectedRDataException {
+		final ProgressMonitor m= new NullProgressMonitor();
+		
+		this.localR.start();
+		
+		final RServi servi1= getServi("test1");
+		
+		final Thread blocker= new Thread() {
+			@Override
+			public void run() {
+				try {
+					servi1.evalVoid("Sys.sleep(20L)", m);
+				}
+				catch (final Exception e) {
+				}
+			}
+		};
+		blocker.start();
+		try {
+			Thread.sleep(500);
+		}
+		catch (final InterruptedException e) {}
+		
+		final long t1= System.nanoTime();
+		this.localR.stop();
+		final long d1= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1);
+		final long dMax= new RServiNodeConfig().getStartStopTimeout() + 1500;
+		assertTrue(d1 < new RServiNodeConfig().getStartStopTimeout() + 1500,
+				() -> String.format("duration expected: < %1$sms, actual: %2$sms", dMax, d1) );
+	}
+	
 }
diff --git a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
index 7579ac3..18d4d06 100644
--- a/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
+++ b/servi/org.eclipse.statet.rj.servi/META-INF/MANIFEST.MF
@@ -13,6 +13,7 @@
 Import-Package: org.apache.commons.pool2;version="[2.6.2,3.0.0)";resolution:=optional,
  org.apache.commons.pool2.impl;version="[2.6.2,3.0.0)";resolution:=optional,
  org.eclipse.statet.jcommons.collections;version="4.2.0",
+ org.eclipse.statet.jcommons.concurrent;version="4.2.0",
  org.eclipse.statet.jcommons.lang;version="4.2.0",
  org.eclipse.statet.jcommons.rmi;version="4.2.0",
  org.eclipse.statet.jcommons.runtime;version="4.2.0",
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index 09cead0..5dcac0d 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -27,18 +27,22 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.rmi.NotBoundException;
+import java.rmi.UnmarshalException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.eclipse.statet.jcommons.collections.CollectionUtils;
 import org.eclipse.statet.jcommons.collections.ImCollections;
 import org.eclipse.statet.jcommons.collections.ImList;
+import org.eclipse.statet.jcommons.concurrent.CommonThreadFactory;
 import org.eclipse.statet.jcommons.lang.NonNullByDefault;
 import org.eclipse.statet.jcommons.lang.Nullable;
 import org.eclipse.statet.jcommons.lang.SystemUtils;
@@ -95,6 +99,15 @@
 		return args;
 	}
 	
+	private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
+			new CommonThreadFactory("LocalNodeFactory-Monitor") {
+				@Override
+				protected boolean getDaemon() {
+					return true;
+				}
+			});
+	
+	
 	private static class ProcessConfig {
 		final Map<String, String> addEnv= new HashMap<>();
 		final List<String> command= new ArrayList<>();
@@ -119,7 +132,7 @@
 	
 	private boolean verbose;
 	
-	private long timeoutNanos= TimeUnit.SECONDS.toNanos(30);
+	private long timeoutNanos= TimeUnit.SECONDS.toNanos(10);
 	
 	private final List<String> sslPropertyArgs;
 	
@@ -629,39 +642,46 @@
 	
 	@Override
 	public void stopNode(final NodeHandler handler) {
-		final long t= System.nanoTime();
-		final long timeout= this.timeoutNanos;
-		
-		try {
-			handler.shutdown();
-		}
-		catch (final Throwable e) {
-			Utils.logWarning(Messages.ShutdownNode_error_message, e);
-		}
-		
 		final Process process= handler.process;
 		handler.process= null;
-		if (process != null) {
-			for (int i= 0; i < Integer.MAX_VALUE; i++) {
+		
+		class StopMonitor implements Runnable {
+			
+			volatile boolean killed;
+			
+			@Override
+			public synchronized void run() {
 				try {
-					Thread.sleep(250);
-				}
-				catch (final InterruptedException e) {
-				}
-				try {
-					process.exitValue();
-					break;
-				}
-				catch (final IllegalThreadStateException e) {
-					final long diff= System.nanoTime() - t;
-					if (i >= 10 && timeout >= 0 && diff > timeout) {
+					if (process != null && process.isAlive()) {
+						this.killed= true;
 						process.destroy();
-						Utils.logWarning(String.format("Killed RServi node '%1$s'.",
-								(handler.address != null) ? handler.address.getName() : "<address not available>" ));
-						break;
+						Utils.logWarning("Killed R node, because it didn't stop regularly.");
 					}
-					continue;
 				}
+				catch (final Throwable e) {
+					Utils.logError("A runtime error occurred in StopMonitor.", e);
+				}
+			}
+			
+		}
+		final StopMonitor monitor= new StopMonitor();
+		MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
+		try {
+			handler.shutdown();
+			
+			if (process != null) {
+				try {
+					process.waitFor();
+				}
+				catch (final InterruptedException e) {}
+			}
+		}
+		catch (final Throwable e) {
+			if (monitor.killed && e instanceof UnmarshalException) {
+				// ignore
+			}
+			else {
+				Utils.logWarning(Messages.ShutdownNode_error_message, e);
 			}
 		}
 		
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
index 433bb10..1c478f4 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeManager.java
@@ -110,10 +110,8 @@
 		}
 		CommonsRuntime.getEnvironment().removeStoppingListener(this);
 		if (this.inUse) {
-			returnRServi(this.accessId);
-			if (this.handler == null) {
-				return;
-			}
+			this.inUse= false;
+			this.accessId++;
 		}
 		final ThisNodeHandler handler= nonNullAssert(this.handler);
 		this.handler= null;
diff --git a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
index 0047f49..b2173d6 100644
--- a/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
+++ b/servi/org.eclipse.statet.rj.servi/srcServer/org/eclipse/statet/internal/rj/servi/server/NodeServer.java
@@ -23,12 +23,12 @@
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.logging.Level;
 
 import javax.security.auth.login.LoginException;
 
+import org.eclipse.statet.jcommons.lang.Nullable;
+
 import org.eclipse.statet.rj.RjException;
 import org.eclipse.statet.rj.server.DataCmdItem;
 import org.eclipse.statet.rj.server.MainCmdC2SList;
@@ -52,18 +52,20 @@
 public class NodeServer extends SrvEngineServer {
 	
 	
-	class ConsoleDummy extends Thread {
+	class ConsoleMockup extends Thread {
 		
 		private final Client client;
 		private final PrintStream out;
 		
 		private final MainCmdC2SList c2sList= new MainCmdC2SList();
 		
-		ConsoleDummy(final Client client) {
+		private int waitMillis= 5000;
+		
+		ConsoleMockup() {
 			setName("R Console");
 			setDaemon(true);
 			setPriority(NORM_PRIORITY-1);
-			this.client= client;
+			this.client= NodeServer.this.consoleMockupClient;
 			this.out= System.out;
 		}
 		
@@ -71,11 +73,11 @@
 		public void run() {
 			try {
 				synchronized (NodeServer.this.srvEngine) {
-					if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) {
+					if (NodeServer.this.isConsoleEnabled || NodeServer.this.consoleMockup != null) {
 						return;
 					}
+					NodeServer.this.consoleMockup= this;
 					NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
-					NodeServer.this.isConsoleDummyRunning= true;
 				}
 				
 				RjsComObject sendCom= null;
@@ -109,13 +111,18 @@
 								}
 								break;
 							case RjsComObject.T_STATUS:
-								switch (((RjsStatus) receivedCom).getCode()) {
+								switch (((RjsStatus)receivedCom).getCode()) {
 								case Server.S_DISCONNECTED:
 									throw new ConnectException("");
 								case Server.S_LOST:
 								case Server.S_NOT_STARTED:
 								case Server.S_STOPPED:
-									return;
+									synchronized (NodeServer.this.srvEngine) {
+										if (NodeServer.this.consoleMockup == this) {
+											NodeServer.this.consoleMockup= null;
+										}
+										return;
+									}
 								}
 							}
 						}
@@ -123,7 +130,9 @@
 					catch (final ConnectException e) {
 						synchronized (NodeServer.this.srvEngine) {
 							if (NodeServer.this.isConsoleEnabled) {
-								NodeServer.this.isConsoleDummyRunning= false;
+								if (NodeServer.this.consoleMockup == this) {
+									NodeServer.this.consoleMockup= null;
+								}
 								return;
 							}
 							NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
@@ -138,7 +147,7 @@
 					}
 					if (sendCom == null) {
 						try {
-							sleep(5000);
+							sleep(this.waitMillis);
 						}
 						catch (final InterruptedException e) {
 						}
@@ -150,6 +159,11 @@
 			}
 		}
 		
+		public void aboutToStop() {
+			this.waitMillis= 100;
+			interrupt();
+		}
+		
 	}
 	
 	class Node implements RServiNode {
@@ -172,8 +186,8 @@
 					NodeServer.this.authMethod= new NoAuthMethod("<internal>");
 					enabled= NodeServer.this.isConsoleEnabled= false;
 //					LOGGER.fine("before start");
-					if (!NodeServer.this.isConsoleDummyRunning) {
-						new ConsoleDummy(NodeServer.this.consoleDummyClient).start();
+					if (NodeServer.this.consoleMockup == null) {
+						new ConsoleMockup().start();
 					}
 //					LOGGER.fine("after start");
 				}
@@ -268,11 +282,11 @@
 	
 	
 	private boolean isConsoleEnabled;
-	private boolean isConsoleDummyRunning;
 	
 	private final ServerAuthMethod rserviAuthMethod;
 	
-	private final Client consoleDummyClient;
+	private final Client consoleMockupClient;
+	private @Nullable ConsoleMockup consoleMockup;
 	
 	private String currentClientLabel;
 	private Backend currentClientBackend;
@@ -286,7 +300,7 @@
 	public NodeServer(final RMIServerControl control) {
 		super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$
 		this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$
-		this.consoleDummyClient= new Client("-", "dummy", (byte) 0); //$NON-NLS-1$
+		this.consoleMockupClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$
 	}
 	
 	
@@ -312,7 +326,7 @@
 		
 		final Map<String, Object> properties= new HashMap<>();
 		properties.put("args", new String[0]);
-		this.srvEngine.start(this.consoleDummyClient, properties);
+		this.srvEngine.start(this.consoleMockupClient, properties);
 		
 		try {
 			synchronized (this.serviRunLock) {
@@ -403,20 +417,36 @@
 	
 	void shutdown() {
 		this.control.checkCleanup();
-		new Timer(true).schedule(new TimerTask() {
-			@Override
-			public void run() {
-				try {
-					unbindClient();
-				}
-				catch (final Exception e) {
-					e.printStackTrace();
-				}
-				finally {
-					System.exit(0);
-				}
+		
+		try {
+			LOGGER.log(Level.FINE, "Shutting down R node: Unbind client...");
+			unbindClient();
+		}
+		catch (final Exception e) {
+			LOGGER.log(Level.SEVERE,
+					"An error occurred when unbinding the client for shutdown.",
+					e );
+		}
+		
+		final ConsoleMockup consoleMockup= this.consoleMockup;
+		if (consoleMockup != null) {
+			consoleMockup.aboutToStop();
+		}
+		try {
+			LOGGER.log(Level.FINE, "Shutting down R node: Exit R engine...");
+			synchronized (this.serviRunLock) {
+				runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
+						"base::q(\"no\")", null, null, null ));
 			}
-		}, 500L);
+		}
+		catch (final RjExitException e) {
+			// expected
+		}
+		catch (final Exception e) {
+			LOGGER.log(Level.SEVERE,
+					"An error occurred when exiting the R engine for shutdown.",
+					e );
+		}
 	}
 	
 	public void setProperties(final Map<String, ? extends Object> properties, final Object caller) throws RemoteException {