Bug 571160: [RJ-Servi] Improve logging when stopping R nodes
- Add unit tests for evict/shutdown of nodes under different
conditions
Change-Id: I67ce44dcd431483ab3e6b0f059ddcca8963389ab
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
index 67732ef..2ae7870 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/pool/JMPoolTest.java
@@ -17,6 +17,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
@@ -27,8 +28,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.security.auth.login.LoginException;
@@ -47,15 +50,16 @@
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
+import org.eclipse.statet.internal.rj.servi.NodeHandler;
import org.eclipse.statet.internal.rj.servi.RServiImpl;
import org.eclipse.statet.rj.data.RDataUtils;
import org.eclipse.statet.rj.server.client.AbstractRJComClient;
import org.eclipse.statet.rj.server.util.RJContext;
-import org.eclipse.statet.rj.servi.Accessor;
import org.eclipse.statet.rj.servi.RServi;
import org.eclipse.statet.rj.servi.RServiUtils;
import org.eclipse.statet.rj.servi.jmx.PoolStatusMX;
import org.eclipse.statet.rj.servi.node.RServiNodeConfig;
+import org.eclipse.statet.rj.servi.test.Accessor;
@EnabledIfEnvironmentVariable(named= "STATET_TEST_FILES", matches= ".+")
@@ -435,6 +439,107 @@
assertNull(node1.getClientLabel());
}
+ @Test
+ public void PoolNode_evict() throws Exception {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMinIdleCount(0);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+ assertNodeOperative(servi1);
+
+ final PoolNodeObject node1= assertSinglePoolNode();
+ closeServi(servi1);
+ node1.evict(0);
+
+ skipChecking(node1);
+ assertEquals(PoolNodeState.DISPOSED, node1.getState());
+ }
+
+ @Test
+ public void PoolNode_evictAllocated() throws Exception {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMinIdleCount(0);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+ assertNodeOperative(servi1);
+
+ final PoolNodeObject node1= assertSinglePoolNode();
+ assertEquals(PoolNodeState.ALLOCATED, node1.getState());
+ node1.evict(0);
+
+ skipChecking(node1);
+ assertEquals(PoolNodeState.DISPOSED, node1.getState());
+
+ assertThrows(StatusException.class, () -> closeServi(servi1));
+ }
+
+ @Test
+ public void PoolNode_evictBusy() throws Exception {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMinIdleCount(0);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+
+ final ProgressMonitor m= new NullProgressMonitor();
+ assertNodeOperative(servi1, m);
+
+ final var clientState= new AtomicInteger();
+ final var clientWorker= CompletableFuture.supplyAsync(() -> {
+ clientState.set(1);
+ try {
+ servi1.evalVoid("while (true) { Sys.sleep(1) }", m);
+ return null;
+ }
+ catch (final StatusException e) {
+ return e;
+ }
+ finally {
+ clientState.set(2);
+ }
+ });
+ while (clientState.get() == 0) {
+ Thread.sleep(100);
+ }
+ final PoolNodeObject node1= assertSinglePoolNode();
+ assertEquals(PoolNodeState.ALLOCATED, node1.getState());
+ node1.evict(0);
+
+ skipChecking(node1);
+ assertEquals(PoolNodeState.DISPOSED, node1.getState());
+
+ assertTrue((clientWorker.get(1, TimeUnit.SECONDS) instanceof StatusException));
+ assertThrows(StatusException.class, () -> closeServi(servi1));
+ }
+
+ @Test
+ public void PoolNode_evictCrashed() throws Exception {
+ final PoolConfig poolConfig= new PoolConfig();
+ poolConfig.setMinIdleCount(0);
+ this.server.setPoolConfig(poolConfig);
+ this.server.start();
+
+ final RServi servi1= getServi("test1");
+ assertNodeOperative(servi1);
+
+ final PoolNodeObject node1= assertSinglePoolNode();
+ final Accessor nodeHandler= new Accessor(node1.getNodeHandler(), NodeHandler.class);
+ final Process process= nonNullAssert((Process)nodeHandler.getFieldObj("process"));
+ process.destroy();
+
+ node1.evict(0);
+
+ skipChecking(node1);
+ assertEquals(PoolNodeState.DISPOSED, node1.getState());
+
+ assertThrows(StatusException.class, () -> closeServi(servi1));
+ }
+
@Test
public void Bug570660() throws Exception {
diff --git a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
similarity index 97%
rename from servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
rename to servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
index 202af6f..8e0d7f9 100644
--- a/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/Accessor.java
+++ b/servi/org.eclipse.statet.rj.servi-tests/src/org/eclipse/statet/rj/servi/test/Accessor.java
@@ -12,7 +12,7 @@
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
-package org.eclipse.statet.rj.servi;
+package org.eclipse.statet.rj.servi.test;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
index e8f4171..7515d69 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/LocalNodeFactory.java
@@ -18,14 +18,18 @@
import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_MAC;
import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_WIN;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.rmi.NotBoundException;
import java.rmi.UnmarshalException;
import java.util.ArrayList;
@@ -38,6 +42,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.statet.jcommons.collections.CollectionUtils;
@@ -109,6 +114,8 @@
private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory("LocalNodeFactory-Monitor") );
+ private static final String NODELOG_FILENAME= "out.log"; //$NON-NLS-1$
+
private static class ProcessConfig {
@@ -571,15 +578,15 @@
catch (final Exception e) {
final StringBuilder sb= new StringBuilder("Error starting R node:");
if (processBuilder != null) {
- sb.append("\n<COMMAND>");
+ sb.append("\n<COMMAND>"); //$NON-NLS-1$
ServerUtils.prettyPrint(processBuilder.command(), sb);
- sb.append("\n</COMMAND>");
+ sb.append("\n</COMMAND>"); //$NON-NLS-1$
}
if (process != null) {
final char[] buffer= new char[4096];
final InputStream stdout= process.getInputStream();
{
- sb.append("\n<STDOUT>\n");
+ sb.append("\n<STDOUT>\n"); //$NON-NLS-1$
try (final var reader= new InputStreamReader(stdout)) {
int n;
try { // read non-blocking
@@ -597,9 +604,9 @@
catch (final IOException ignore) {}
sb.append("</STDOUT>");
}
- final Path logFile= handler.dir.resolve("out.log");
+ final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
if (Files.exists(logFile)) {
- sb.append("\n<LOG file=\"out.log\">\n");
+ sb.append("\n<LOG file=\"" + NODELOG_FILENAME + "\">\n");
try (final var reader= new InputStreamReader(Files.newInputStream(logFile),
StandardCharsets.UTF_8 )) {
int n;
@@ -617,7 +624,7 @@
}
sb.append("</LOG>");
}
- sb.append("\n--------");
+ sb.append("\n--------"); //$NON-NLS-1$
}
Thread.interrupted();
@@ -652,28 +659,34 @@
final Process process= handler.process;
handler.process= null;
- class StopMonitor implements Runnable {
-
- volatile boolean killed;
-
- @Override
- public synchronized void run() {
- try {
- if (process != null && process.isAlive()) {
- this.killed= true;
- process.destroy();
- logWarning(handler, "Killed R node, because it didn't stop regularly.");
- }
+ final AtomicInteger exitType= new AtomicInteger(0);
+ try {
+ if (process != null) {
+ if (!process.isAlive()) {
+ exitType.set(1);
+ logWarning(handler, "R node is already down (process crashed?).",
+ (message) -> {
+ message.addProp("nodeExitCode", process.exitValue());
+ message.addProp("nodeLog", readLogTail(handler));
+ },
+ null );
}
- catch (final Throwable e) {
- logError(handler, "A runtime error occurred in StopMonitor.", e);
+ else {
+ MONITOR_EXECUTOR.schedule(() -> {
+ try {
+ if (process.isAlive()) {
+ exitType.set(2);
+ process.destroy();
+ logWarning(handler, "Killed R node, because it did not shut down regularly.");
+ }
+ }
+ catch (final Throwable e) {
+ logError(handler, "A runtime error occurred in StopMonitor during shutdown of R node.", e);
+ }
+ }, this.timeoutNanos, TimeUnit.NANOSECONDS);
}
}
- }
- final var monitor= new StopMonitor();
- MONITOR_EXECUTOR.schedule(monitor, this.timeoutNanos, TimeUnit.NANOSECONDS);
- try {
handler.shutdown();
if (process != null) {
@@ -684,11 +697,11 @@
}
}
catch (final Throwable e) {
- if (monitor.killed && e instanceof UnmarshalException) {
+ if (exitType.get() != 0 && e instanceof UnmarshalException) {
// ignore
}
else {
- logWarning(handler, Messages.ShutdownNode_error_message, e);
+ logWarning(handler, "An error occurred when trying to shut down R node.", e);
}
}
@@ -714,12 +727,41 @@
}
+ private static final int LOG_NODELOG_MAX= 50_000;
+
+ private String readLogTail(final NodeHandler handler) {
+ final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
+ try {
+ final long fileSize= Files.size(logFile);
+ if (fileSize < LOG_NODELOG_MAX) {
+ return Files.readString(logFile, StandardCharsets.UTF_8).trim();
+ }
+ else {
+ try (final SeekableByteChannel channel= Files.newByteChannel(logFile,
+ StandardOpenOption.READ)) {
+ channel.position(fileSize - LOG_NODELOG_MAX - 1);
+ final var stream= Channels.newInputStream(channel);
+ int b;
+ while ((b= stream.read()) != -1 && b != '\n') {
+ }
+ final ByteArrayOutputStream bytes= new ByteArrayOutputStream(LOG_NODELOG_MAX);
+ stream.transferTo(bytes);
+ return bytes.toString(StandardCharsets.UTF_8).trim();
+ }
+ }
+ }
+ catch (final IOException e) {
+ return String.format("<error: %1$s>", e.getMessage()); //$NON-NLS-1$
+ }
+ }
+
+
private void log(final NodeHandler handler,
final byte severity, final String mainMessage,
final @Nullable Consumer<ToStringBuilder> messageCustomizer,
final @Nullable Throwable e) {
final var message= new ToStringBuilder(mainMessage);
- message.addProp("nodeId", handler.nodeId);
+ message.addProp("nodeId", handler.nodeId); //$NON-NLS-1$
if (messageCustomizer != null) {
messageCustomizer.accept(message);
}
diff --git a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
index 5267eb4..9f3218c 100644
--- a/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
+++ b/servi/org.eclipse.statet.rj.servi/src/org/eclipse/statet/internal/rj/servi/Messages.java
@@ -28,7 +28,6 @@
public static String UnbindClient_error_message= "An exception was thrown when trying to unbind the client (passivate a node).";
public static String StartNode_error_message= "An exception was thrown when trying to start the node (make a node).";
public static String StartLocal_pub_error_message= "Cannot start the RServi instance.";
- public static String ShutdownNode_error_message= "An exception was thrown when trying to shutdown the node (destroy a node).";
public static String RmiUnexportNode_error_message= "An exception was thrown when trying to unexport the node (destroy a node).";
}