Bug 571160: [RJ-Servi] Improve logging when stopping R nodes

  - Add unit tests for evict/shutdown of nodes under different
    conditions

Change-Id: I67ce44dcd431483ab3e6b0f059ddcca8963389ab
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
index 67732ef..2ae7870 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
@@ -17,6 +17,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
@@ -27,8 +28,10 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
 import javax.security.auth.login.LoginException;
@@ -47,15 +50,16 @@
 import org.eclipse.statet.jcommons.status.Status;
 import org.eclipse.statet.jcommons.status.StatusException;
 
+import org.eclipse.statet.internal.rj.servi.NodeHandler;
 import org.eclipse.statet.internal.rj.servi.RServiImpl;
 import org.eclipse.statet.rj.data.RDataUtils;
 import org.eclipse.statet.rj.server.client.AbstractRJComClient;
 import org.eclipse.statet.rj.server.util.RJContext;
-import org.eclipse.statet.rj.servi.Accessor;
 import org.eclipse.statet.rj.servi.RServi;
 import org.eclipse.statet.rj.servi.RServiUtils;
 import org.eclipse.statet.rj.servi.jmx.PoolStatusMX;
 import org.eclipse.statet.rj.servi.node.RServiNodeConfig;
+import org.eclipse.statet.rj.servi.test.Accessor;
 
 
 @EnabledIfEnvironmentVariable(named= "STATET_TEST_FILES", matches= ".+")
@@ -435,6 +439,107 @@
 		assertNull(node1.getClientLabel());
 	}
 	
+	@Test
+	public void PoolNode_evict() throws Exception {
+		final PoolConfig poolConfig= new PoolConfig();
+		poolConfig.setMinIdleCount(0);
+		this.server.setPoolConfig(poolConfig);
+		this.server.start();
+		
+		final RServi servi1= getServi("test1");
+		assertNodeOperative(servi1);
+		
+		final PoolNodeObject node1= assertSinglePoolNode();
+		closeServi(servi1);
+		node1.evict(0);
+		
+		skipChecking(node1);
+		assertEquals(PoolNodeState.DISPOSED, node1.getState());
+	}
+	
+	@Test
+	public void PoolNode_evictAllocated() throws Exception {
+		final PoolConfig poolConfig= new PoolConfig();
+		poolConfig.setMinIdleCount(0);
+		this.server.setPoolConfig(poolConfig);
+		this.server.start();
+		
+		final RServi servi1= getServi("test1");
+		assertNodeOperative(servi1);
+		
+		final PoolNodeObject node1= assertSinglePoolNode();
+		assertEquals(PoolNodeState.ALLOCATED, node1.getState());
+		node1.evict(0);
+		
+		skipChecking(node1);
+		assertEquals(PoolNodeState.DISPOSED, node1.getState());
+		
+		assertThrows(StatusException.class, () -> closeServi(servi1));
+	}
+	
+	@Test
+	public void PoolNode_evictBusy() throws Exception {
+		final PoolConfig poolConfig= new PoolConfig();
+		poolConfig.setMinIdleCount(0);
+		this.server.setPoolConfig(poolConfig);
+		this.server.start();
+		
+		final RServi servi1= getServi("test1");
+		
+		final ProgressMonitor m= new NullProgressMonitor();
+		assertNodeOperative(servi1, m);
+		
+		final var clientState= new AtomicInteger();
+		final var clientWorker= CompletableFuture.supplyAsync(() -> {
+			clientState.set(1);
+			try {
+				servi1.evalVoid("while (true) { Sys.sleep(1) }", m);
+				return null;
+			}
+			catch (final StatusException e) {
+				return e;
+			}
+			finally {
+				clientState.set(2);
+			}
+		});
+		while (clientState.get() == 0) {
+			Thread.sleep(100);
+		}
+		final PoolNodeObject node1= assertSinglePoolNode();
+		assertEquals(PoolNodeState.ALLOCATED, node1.getState());
+		node1.evict(0);
+		
+		skipChecking(node1);
+		assertEquals(PoolNodeState.DISPOSED, node1.getState());
+		
+		assertTrue((clientWorker.get(1, TimeUnit.SECONDS) instanceof StatusException));
+		assertThrows(StatusException.class, () -> closeServi(servi1));
+	}
+	
+	@Test
+	public void PoolNode_evictCrashed() throws Exception {
+		final PoolConfig poolConfig= new PoolConfig();
+		poolConfig.setMinIdleCount(0);
+		this.server.setPoolConfig(poolConfig);
+		this.server.start();
+		
+		final RServi servi1= getServi("test1");
+		assertNodeOperative(servi1);
+		
+		final PoolNodeObject node1= assertSinglePoolNode();
+		final Accessor nodeHandler= new Accessor(node1.getNodeHandler(), NodeHandler.class);
+		final Process process= nonNullAssert((Process)nodeHandler.getFieldObj("process"));
+		process.destroy();
+		
+		node1.evict(0);
+		
+		skipChecking(node1);
+		assertEquals(PoolNodeState.DISPOSED, node1.getState());
+		
+		assertThrows(StatusException.class, () -> closeServi(servi1));
+	}
+	
 	
 	@Test
 	public void Bug570660() throws Exception {
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
similarity index 97%
rename from servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
rename to servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
index 202af6f..8e0d7f9 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
@@ -12,7 +12,7 @@
  #     Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
  #=============================================================================*/
 
-package org.eclipse.statet.rj.servi;
+package org.eclipse.statet.rj.servi.test;
 
 import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
 
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index e8f4171..7515d69 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -18,14 +18,18 @@
 import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_MAC;
 import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_WIN;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.InvalidPathException;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 import java.rmi.NotBoundException;
 import java.rmi.UnmarshalException;
 import java.util.ArrayList;
@@ -38,6 +42,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import org.eclipse.statet.jcommons.collections.CollectionUtils;
@@ -109,6 +114,8 @@
 	private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
 			new DaemonThreadFactory("LocalNodeFactory-Monitor") );
 	
+	private static final String NODELOG_FILENAME= "out.log"; //$NON-NLS-1$
+	
 	
 	private static class ProcessConfig {
 		
@@ -571,15 +578,15 @@
 		catch (final Exception e) {
 			final StringBuilder sb= new StringBuilder("Error starting R node:");
 			if (processBuilder != null) {
-				sb.append("\n<COMMAND>");
+				sb.append("\n<COMMAND>"); //$NON-NLS-1$
 				ServerUtils.prettyPrint(processBuilder.command(), sb);
-				sb.append("\n</COMMAND>");
+				sb.append("\n</COMMAND>"); //$NON-NLS-1$
 			}
 			if (process != null) {
 				final char[] buffer= new char[4096];
 				final InputStream stdout= process.getInputStream();
 				{
-					sb.append("\n<STDOUT>\n");
+					sb.append("\n<STDOUT>\n"); //$NON-NLS-1$
 					try (final var reader= new InputStreamReader(stdout)) {
 						int n;
 						try { // read non-blocking
@@ -597,9 +604,9 @@
 					catch (final IOException ignore) {}
 					sb.append("</STDOUT>");
 				}
-				final Path logFile= handler.dir.resolve("out.log");
+				final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
 				if (Files.exists(logFile)) {
-					sb.append("\n<LOG file=\"out.log\">\n");
+					sb.append("\n<LOG file=\"" + NODELOG_FILENAME + "\">\n");
 					try (final var reader= new InputStreamReader(Files.newInputStream(logFile),
 							StandardCharsets.UTF_8 )) {
 						int n;
@@ -617,7 +624,7 @@
 					}
 					sb.append("</LOG>");
 				}
-				sb.append("\n--------");
+				sb.append("\n--------"); //$NON-NLS-1$
 			}
 			
 			Thread.interrupted();
@@ -652,28 +659,34 @@
 		final Process process= handler.process;
 		handler.process= null;
 		
-		class StopMonitor implements Runnable {
-			
-			volatile boolean killed;
-			
-			@Override
-			public synchronized void run() {
-				try {
-					if (process != null && process.isAlive()) {
-						this.killed= true;
-						process.destroy();
-						logWarning(handler, "Killed R node, because it didn't stop regularly.");
-					}
+		final AtomicInteger exitType= new AtomicInteger(0);
+		try {
+			if (process != null) {
+				if (!process.isAlive()) {
+					exitType.set(1);
+					logWarning(handler, "R node is already down (process crashed?).",
+							(message) -> {
+								message.addProp("nodeExitCode", process.exitValue());
+								message.addProp("nodeLog", readLogTail(handler));
+							},
+							null );
 				}
-				catch (final Throwable e) {
-					logError(handler, "A runtime error occurred in StopMonitor.", e);
+				else {
+					MONITOR_EXECUTOR.schedule(() -> {
+						try {
+							if (process.isAlive()) {
+								exitType.set(2);
+								process.destroy();
+								logWarning(handler, "Killed R node, because it did not shut down regularly.");
+							}
+						}
+						catch (final Throwable e) {
+							logError(handler, "A runtime error occurred in StopMonitor during shutdown of R node.", e);
+						}
+					}, this.timeoutNanos, TimeUnit.NANOSECONDS);
 				}
 			}
 			
-		}
-		final var monitor= new StopMonitor();
-		MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
-		try {
 			handler.shutdown();
 			
 			if (process != null) {
@@ -684,11 +697,11 @@
 			}
 		}
 		catch (final Throwable e) {
-			if (monitor.killed && e instanceof UnmarshalException) {
+			if (exitType.get() != 0 && e instanceof UnmarshalException) {
 				// ignore
 			}
 			else {
-				logWarning(handler, Messages.ShutdownNode_error_message, e);
+				logWarning(handler, "An error occurred when trying to shut down R node.", e);
 			}
 		}
 		
@@ -714,12 +727,41 @@
 	}
 	
 	
+	private static final int LOG_NODELOG_MAX= 50_000;
+	
+	private String readLogTail(final NodeHandler handler) {
+		final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
+		try {
+			final long fileSize= Files.size(logFile);
+			if (fileSize < LOG_NODELOG_MAX) {
+				return Files.readString(logFile, StandardCharsets.UTF_8).trim();
+			}
+			else {
+				try (final SeekableByteChannel channel= Files.newByteChannel(logFile,
+						StandardOpenOption.READ)) {
+					channel.position(fileSize - LOG_NODELOG_MAX - 1);
+					final var stream= Channels.newInputStream(channel);
+					int b;
+					while ((b= stream.read()) != -1 && b != '\n') {
+					}
+					final ByteArrayOutputStream bytes= new ByteArrayOutputStream(LOG_NODELOG_MAX);
+					stream.transferTo(bytes);
+					return bytes.toString(StandardCharsets.UTF_8).trim();
+				}
+			}
+		}
+		catch (final IOException e) {
+			return String.format("<error: %1$s>", e.getMessage()); //$NON-NLS-1$
+		}
+	}
+	
+	
 	private void log(final NodeHandler handler,
 			final byte severity, final String mainMessage,
 			final @Nullable Consumer<ToStringBuilder> messageCustomizer,
 			final @Nullable Throwable e) {
 		final var message= new ToStringBuilder(mainMessage);
-		message.addProp("nodeId", handler.nodeId);
+		message.addProp("nodeId", handler.nodeId); //$NON-NLS-1$
 		if (messageCustomizer != null) {
 			messageCustomizer.accept(message);
 		}
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
index 5267eb4..9f3218c 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
@@ -28,7 +28,6 @@
 	public static String UnbindClient_error_message= "An exception was thrown when trying to unbind the client (passivate a node).";
 	public static String StartNode_error_message= "An exception was thrown when trying to start the node (make a node).";
 	public static String StartLocal_pub_error_message= "Cannot start the RServi instance.";
-	public static String ShutdownNode_error_message= "An exception was thrown when trying to shutdown the node (destroy a node).";
 	public static String RmiUnexportNode_error_message= "An exception was thrown when trying to unexport the node (destroy a node).";
 	
 }