blob: 464a3a09309286dc5e75da5a8e564b263193b360 [file] [log] [blame]
# Copyright (c) 2009, 2021 Stephan Wahlbrink and others.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
#, or the Apache License, Version 2.0
# which is available at
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
# Contributors:
# Stephan Wahlbrink <> - initial API and implementation
package org.eclipse.statet.internal.rj.servi;
import static org.eclipse.statet.jcommons.lang.ObjectUtils.nonNullAssert;
import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_MAC;
import static org.eclipse.statet.jcommons.lang.SystemUtils.OS_WIN;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.rmi.NotBoundException;
import java.rmi.UnmarshalException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.statet.jcommons.collections.CollectionUtils;
import org.eclipse.statet.jcommons.collections.ImCollections;
import org.eclipse.statet.jcommons.collections.ImList;
import org.eclipse.statet.jcommons.concurrent.DaemonThreadFactory;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
import org.eclipse.statet.jcommons.lang.ObjectUtils.ToStringBuilder;
import org.eclipse.statet.jcommons.lang.SystemUtils;
import org.eclipse.statet.jcommons.rmi.RMIAddress;
import org.eclipse.statet.jcommons.rmi.RMIRegistry;
import org.eclipse.statet.jcommons.runtime.CommonsRuntime;
import org.eclipse.statet.jcommons.runtime.bundle.BundleEntry;
import org.eclipse.statet.jcommons.runtime.bundle.BundleSpec;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.RjInvalidConfigurationException;
import org.eclipse.statet.rj.server.Server;
import org.eclipse.statet.rj.server.ServerLogin;
import org.eclipse.statet.rj.server.util.LocalREnv;
import org.eclipse.statet.rj.server.util.RJContext;
import org.eclipse.statet.rj.server.util.ServerUtils;
import org.eclipse.statet.rj.servi.RServiUtils;
import org.eclipse.statet.rj.servi.node.RServiNode;
import org.eclipse.statet.rj.servi.node.RServiNodeConfig;
public class LocalNodeFactory implements NodeFactory {
public static final ImList<BundleSpec> CODEBASE_LIB_SPECS=
private static final Set<String> EXCLUDE_ENV_VAR_NAMES= ImCollections.newSet(
private static void copySystemProperty(final String key, final List<String> command) {
final String property= System.getProperty(key);
if (property != null) {
command.add("-D" + key + "=" + property);
private static void copySystemPropertyPath(final String key, final List<String> command) {
String property= System.getProperty(key);
if (property != null) {
final Path path= Path.of(property);
if (!path.isAbsolute()) {
property= FileUtils.getUserWorkingDirectory().resolve(path).toString();
command.add("-D" + key + "=" + property);
private static List<String> createSSLPropertyArgs() {
final List<String> args= new ArrayList<>();
copySystemPropertyPath("", args);
copySystemProperty("", args);
copySystemPropertyPath("", args);
copySystemProperty("", args);
return args;
private static final ScheduledExecutorService MONITOR_EXECUTOR= Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory("LocalNodeFactory-Monitor") );
private static final String NODELOG_FILENAME= "out.log"; //$NON-NLS-1$
private static class ProcessConfig {
final Map<String, String> addEnv= new HashMap<>();
final List<String> command= new ArrayList<>();
int nameCommandIdx= -1;
Path baseWorkingDir;
@Nullable String authConfig;
@Nullable String rStartupSnippet;
public ProcessConfig() {
private final String poolId;
private final String factoryId;
private @Nullable RServiNodeConfig baseConfig;
private final RJContext context;
private final ImList<BundleSpec> libSpecs;
private @Nullable ProcessConfig processConfig;
private @Nullable String errorMessage= null;
private @Nullable RMIRegistry nodeRegistry;
private boolean verbose;
private long timeoutNanos= TimeUnit.SECONDS.toNanos(10);
private final List<String> sslPropertyArgs;
private int nodeCounter;
public LocalNodeFactory(final String poolId,
final RJContext context, final List<BundleSpec> libSpecs) {
if (poolId == null) {
throw new NullPointerException("poolId");
if (context == null) {
throw new NullPointerException("context");
this.poolId= poolId;
this.factoryId= String.format("%1$s-%2$08X", poolId, ThreadLocalRandom.current().nextInt());
this.context= context;
this.libSpecs= ImCollections.toList(libSpecs);
this.sslPropertyArgs= createSSLPropertyArgs();
public void setRegistry(final @Nullable RMIRegistry registry) {
this.nodeRegistry= registry;
public void setConfig(final RServiNodeConfig config) throws RjInvalidConfigurationException {
final ProcessConfig p= new ProcessConfig();
final StringBuilder sb= new StringBuilder();
// R home
final String rHomeString;
{ String value= config.getRHome();
if (value == null || value.isEmpty()) {
value= config.getEnvironmentVariables().get("R_HOME");
if (value == null || value.isEmpty()) {
this.errorMessage= "Missing value for R_HOME.";
throw new RjInvalidConfigurationException(this.errorMessage);
rHomeString= value;
final Path rHomeDir;
try {
rHomeDir= Path.of(rHomeString);
catch (final InvalidPathException e) {
this.errorMessage= String.format("Invalid value for R_HOME: %1$s.", e.getMessage());
throw new RjInvalidConfigurationException(this.errorMessage);
if (!Files.isDirectory(rHomeDir)) {
this.errorMessage= "Invalid value for R_HOME: directory does not exists.";
throw new RjInvalidConfigurationException(this.errorMessage);
p.addEnv.put("R_HOME", rHomeString);
// R lib path
final LocalREnv serverREnv= new LocalREnv((final String name) -> {
if (name.equals("R_HOME")) {
return rHomeString;
return config.getEnvironmentVariables().get(name);
final Path rjPkgPath= serverREnv.searchRPkg("rj");
if (rjPkgPath == null) {
this.errorMessage= "Can not find the R package 'rj' in the R library path:\n\t" +
CollectionUtils.toString(serverREnv.getRLibPaths(), "\n\t");
throw new RjInvalidConfigurationException(this.errorMessage);
// Java
String javaHome= config.getJavaHome();
if (javaHome == null || javaHome.isEmpty()) {
javaHome= nonNullAssert(System.getProperty("java.home"));
p.addEnv.put("JAVA_HOME", javaHome);
p.command.add(javaHome + File.separatorChar + "bin" + File.separatorChar + "java");
{ p.command.add("-classpath");
String s= new BundleEntry.Jar("rj-boot", rjPkgPath.resolve("server/rj-boot.jar"))
final String env= config.getEnvironmentVariables().get("CLASSPATH");
if (env != null) {
s+= File.pathSeparatorChar + env;
String javaArgs= config.getJavaArgs();
if (javaArgs != null && (javaArgs= javaArgs.trim()).length() > 0) {
else {
javaArgs= "";
if (javaArgs.indexOf("-D" + RJContext.RJ_SERVER_CLASS_PATH_PROPERTY_KEY + "=") < 0) {
final List<BundleEntry> bundles;
try {
bundles= this.context.resolveBundles(this.libSpecs);
catch (final StatusException e) {
this.errorMessage= "Can not resolve bundles for Java classpath of node - " + e.getMessage();
throw new RjInvalidConfigurationException(this.errorMessage, e);
sb.append("-D" + RJContext.RJ_SERVER_CLASS_PATH_PROPERTY_KEY + "=");
// RMI
final String hostname= System.getProperty("java.rmi.server.hostname");
if (hostname != null && hostname.length() > 0) {
p.command.add("-Djava.rmi.server.hostname=" + hostname);
if (javaArgs.indexOf("") < 0) {
if (javaArgs.indexOf("-Djava.rmi.server.codebase=") < 0) {
final List<BundleEntry> bundles;
try {
bundles= this.context.resolveBundles(CODEBASE_LIB_SPECS);
catch (final StatusException e) {
this.errorMessage= "Can not resolve bundles for Java codebase of node - " + e.getMessage();
throw new RjInvalidConfigurationException(this.errorMessage, e);
// Main
p.nameCommandIdx= p.command.size();
// ...
String nodeArgs= config.getNodeArgs();
if (nodeArgs != null && (nodeArgs= nodeArgs.trim()).length() > 0) {
String rArch= config.getRArch();
if (rArch != null && rArch.isEmpty()) {
rArch= null;
boolean rArchAuto= false;
if (rArch == null && javaHome.equals(System.getProperty("java.home"))) {
rArch= nonNullAssert(System.getProperty("os.arch"));
if (rArch.equals("amd64")) {
rArch= "x86_64";
else if (rArch.equals("x86")) {
rArch= "i386";
rArchAuto= true;
if (rArch != null) {
// validate R_ARCH
if (SystemUtils.getLocalOs() == OS_WIN) {
if (rArch.equals("x86_64")) {
rArch= "x64";
final var binDir= rHomeDir.resolve("bin");
if (!Files.isDirectory(binDir.resolve(rArch))) {
rArch= null;
else {
final var execDir= rHomeDir.resolve("bin").resolve("exec");
if (!Files.isDirectory(execDir.resolve(rArch))) {
if (Files.isDirectory(execDir) &&
(rArch.equals("i386") || rArch.equals("i586") || rArch.equals("i686")) ) {
if (Files.isDirectory(execDir.resolve("i686"))) {
rArch= "i686";
else if (Files.isDirectory(execDir.resolve("i586"))) {
rArch= "i586";
else if (Files.isDirectory(execDir.resolve("i386"))) {
rArch= "i386";
else {
rArch= null;
else {
rArch= null;
if (rArch != null) {
p.addEnv.put("R_ARCH", '/'+rArch);
else if (!rArchAuto) {
Utils.logInfo("Failed to validate specified architecture, value is not used.");
switch (SystemUtils.getLocalOs()) {
case OS_WIN: {
final String rBinDir;
if (rArch != null) {
rBinDir= rHomeString + File.separatorChar + "bin" + File.separatorChar + rArch;
else {
rBinDir= rHomeString + File.separatorChar + "bin";
final String pathEnv= System.getenv("PATH");
p.addEnv.put("PATH", (pathEnv != null) ? (rBinDir + File.pathSeparatorChar + pathEnv) : rBinDir);
break; }
case OS_MAC: {
final String rBinDir= rHomeString + File.separatorChar + "bin";
final String pathEnv= System.getenv("PATH");
p.addEnv.put("PATH", (pathEnv != null) ? (rBinDir + File.pathSeparatorChar + pathEnv) : rBinDir);
final String rLibDir= rHomeString + File.separatorChar + "lib";
final String libPathEnv= System.getenv("DYLD_LIBRARY_PATH");
p.addEnv.put("DYLD_LIBRARY_PATH", (libPathEnv != null) ? (rLibDir + File.pathSeparatorChar + libPathEnv) : rLibDir);
break; }
default: {
final String rBinDir= rHomeString + File.separatorChar + "bin";
final String pathEnv= System.getenv("PATH");
p.addEnv.put("PATH", (pathEnv != null) ? (rBinDir + File.pathSeparatorChar + pathEnv) : rBinDir);
final String rLibDir;
if (rArch != null) {
rLibDir= rHomeString + File.separatorChar + "lib" + File.separatorChar + rArch;
else {
rLibDir= rHomeString + File.separatorChar + "lib";
final String libPathEnv= System.getenv("LD_LIBRARY_PATH");
p.addEnv.put("LD_LIBRARY_PATH", (libPathEnv != null) ? (rLibDir + File.pathSeparatorChar + libPathEnv) : rLibDir);
break; }
{ String dirString= config.getBaseWorkingDirectory();
if (dirString == null || dirString.isEmpty()) {
dirString= nonNullAssert(System.getProperty(""));
try {
p.baseWorkingDir= checkBaseDir(dirString);
catch (final InvalidPathException | IOException e) {
this.errorMessage= "Invalid working directory base path.";
throw new RjInvalidConfigurationException(this.errorMessage, e);
for (final Entry<String, String> var : config.getEnvironmentVariables().entrySet()) {
if (!EXCLUDE_ENV_VAR_NAMES.contains(var.getKey())) {
p.addEnv.put(var.getKey(), var.getValue());
p.authConfig= config.getEnableConsole() ? "none" : null;
p.rStartupSnippet= config.getRStartupSnippet();
final var timeout= config.getStartStopTimeout();
synchronized (this) {
this.verbose= config.getEnableVerbose();
this.baseConfig= config;
this.processConfig= p;
this.timeoutNanos= (timeout != null) ? timeout.toNanos() : -1;
private Path checkBaseDir(final String s) throws IOException {
final Path path= Path.of(s);
final Path testDir= Files.createDirectory(path.resolve(this.poolId + "-test"));
return path;
public @Nullable RServiNodeConfig getConfig() {
return this.baseConfig;
public void createNode(final NodeHandler handler) throws RjException {
final long t= System.nanoTime();
final long timeout;
final ProcessConfig p;
final RMIRegistry registry;
final int counter;
synchronized (this) {
p= this.processConfig;
if (p == null) {
final String message= this.errorMessage;
throw new RjInvalidConfigurationException((message != null) ? message :
"Missing R node configuration.");
registry= this.nodeRegistry;
if (registry == null) {
throw new RjInvalidConfigurationException("Missing registry configuration.");
timeout= this.timeoutNanos;
counter= ++this.nodeCounter;
try {
prepareNode(handler, p.baseWorkingDir, counter, registry);
catch (final RjException e) {
try { // retry
prepareNode(handler, p.baseWorkingDir, counter, registry);
logWarning(handler, "Preparing R node required a second attempt.", e);
catch (final InterruptedException | RjException e2) {
throw new RjException("Error preparing R node.", e);
ProcessBuilder processBuilder;
List<String> command= null;
try {
command= new ArrayList<>(p.command.size() + 2);
command.set(p.nameCommandIdx, handler.address.toString());
if (this.verbose) {
if (registry.getAddress().isSsl()) {
command.addAll(p.nameCommandIdx - 1, this.sslPropertyArgs);
processBuilder= new ProcessBuilder(command);
catch (final Exception e) {
throw new RjException("Error preparing process for R node.", e);
Process process= null;
try {
process= processBuilder.start();
for (int i= 1; i < Integer.MAX_VALUE; i++) {
try {
final Server server= (Server)registry.getRegistry().lookup(handler.nodeId);
final ServerLogin login= server.createLogin(Server.C_RSERVI_NODECONTROL);
final RServiNode node= (RServiNode)server.execute(Server.C_RSERVI_NODECONTROL, null, login);
logInfo(handler, "New R node started.",
(message) -> message.addProp("duration(ms)", (System.nanoTime() - t)/1000000) );
String line= null;
try {
final String snippet= p.rStartupSnippet;
if (snippet != null && snippet.length() > 0) {
final String[] lines= snippet.split("\\p{Blank}*\\r[\\n]?|\\n\\p{Blank}*"); //$NON-NLS-1$
for (int j= 0; j < lines.length; j++) {
line= lines[j];
if (line.length() > 0) {
catch (final RjException e) {
try {
catch (final Exception ignore) {}
throw new RjException("Running the R startup snippet failed in line '" + line + "'.", e);
try {
handler.isConsoleEnabled= node.setConsole(p.authConfig);
catch (final RjException e) {
try {
catch (final Exception ignore) {}
throw e;
handler.init2(node, process);
catch (final NotBoundException e) {
final long diff= System.nanoTime() - t;
if (i >= 10 && timeout >= 0 && diff > timeout) {
throw new RjException("Start of R node aborted because of timeout (t="+(diff/1000000L)+"ms).", e);
try {
final int exitValue= process.exitValue();
throw new RjException("R node process stopped (exit code= "+exitValue+").");
catch (final IllegalThreadStateException ok) {}
catch (final Exception e) {
final StringBuilder sb= new StringBuilder("Error starting R node:");
if (processBuilder != null) {
sb.append("\n<COMMAND>"); //$NON-NLS-1$
ServerUtils.prettyPrint(processBuilder.command(), sb);
sb.append("\n</COMMAND>"); //$NON-NLS-1$
if (process != null) {
final char[] buffer= new char[4096];
final InputStream stdout= process.getInputStream();
sb.append("\n<STDOUT>\n"); //$NON-NLS-1$
try (final var reader= new InputStreamReader(stdout)) {
int n;
try { // read non-blocking
while (reader.ready() && (n=, 0, buffer.length)) >= 0) {
sb.append(buffer, 0, n);
finally {
while ((n=, 0, buffer.length)) >= 0) {
sb.append(buffer, 0, n);
catch (final IOException ignore) {}
final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
if (Files.exists(logFile)) {
sb.append("\n<LOG file=\"" + NODELOG_FILENAME + "\">\n");
try (final var reader= new InputStreamReader(Files.newInputStream(logFile),
StandardCharsets.UTF_8 )) {
int n;
while ((n=, 0, buffer.length)) >= 0) {
sb.append(buffer, 0, n);
if (sb.length() > 100000) {
sb.append(" ...");
catch (final IOException ignore) {}
if (sb.charAt(sb.length() - 1) != '\n') {
sb.append("\n--------"); //$NON-NLS-1$
try {
catch (final IOException ignore) {}
throw new RjException(sb.toString(), e);
protected void prepareNode(final NodeHandler handler,
final Path baseWd, final int counter, final RMIRegistry registry) throws RjException {
final String id= String.format("%1$s-%2$016X-%3$08X",
this.factoryId, System.nanoTime(), counter );
final Path dir= baseWd.resolve(id);
final RMIAddress rmiAddress= new RMIAddress(registry.getAddress(), id);
try {
handler.init1(id, rmiAddress,
Files.createDirectory(dir) );
catch (final IOException e) {
throw new RjException(
String.format("Failed to create working directory: %1$s", dir),
e );
public void stopNode(final NodeHandler handler) {
final Process process= handler.process;
handler.process= null;
final AtomicInteger exitType= new AtomicInteger(0);
try {
if (process != null) {
if (!process.isAlive()) {
logWarning(handler, "R node is already down (process crashed?).",
(message) -> {
message.addProp("nodeExitCode", process.exitValue());
message.addProp("nodeLog", readLogTail(handler));
null );
else {
MONITOR_EXECUTOR.schedule(() -> {
try {
if (process.isAlive()) {
logWarning(handler, "Killed R node, because it did not shut down regularly.");
catch (final Throwable e) {
logError(handler, "A runtime error occurred in StopMonitor during shutdown of R node.", e);
}, this.timeoutNanos, TimeUnit.NANOSECONDS);
if (process != null) {
try {
catch (final InterruptedException e) {}
catch (final Throwable e) {
if (exitType.get() != 0 && e instanceof UnmarshalException) {
// ignore
else {
logWarning(handler, "An error occurred when trying to shut down R node.", e);
if (!this.verbose) {
private void cleanupNode(final NodeHandler handler) {
try {
if (Files.isDirectory(handler.dir)) {
for (int i= 0; ; i++) {
try {
catch (final IOException e) {
if (i >= 20) {
throw e;
try {
catch (final InterruptedException e) {}
catch (final Throwable e) {
logWarning(handler, "Failed to delete working directory of the R node.",
(message) -> message.addProp("path", handler.dir),
e );
private static final int LOG_NODELOG_MAX= 50_000;
private String readLogTail(final NodeHandler handler) {
final Path logFile= handler.dir.resolve(NODELOG_FILENAME);
try {
final long fileSize= Files.size(logFile);
if (fileSize < LOG_NODELOG_MAX) {
return Files.readString(logFile, StandardCharsets.UTF_8).trim();
else {
try (final SeekableByteChannel channel= Files.newByteChannel(logFile,
StandardOpenOption.READ)) {
channel.position(fileSize - LOG_NODELOG_MAX - 1);
final var stream= Channels.newInputStream(channel);
int b;
while ((b= != -1 && b != '\n') {
final ByteArrayOutputStream bytes= new ByteArrayOutputStream(LOG_NODELOG_MAX);
return bytes.toString(StandardCharsets.UTF_8).trim();
catch (final IOException e) {
return String.format("<error: %1$s>", e.getMessage()); //$NON-NLS-1$
private void log(final NodeHandler handler,
final byte severity, final String mainMessage,
final @Nullable Consumer<ToStringBuilder> messageCustomizer,
final @Nullable Throwable e) {
final var message= new ToStringBuilder(mainMessage);
message.addProp("nodeId", handler.nodeId); //$NON-NLS-1$
if (messageCustomizer != null) {
Status.newStatus(severity, RServiUtils.RJ_SERVI_ID, message.toString(), e) );
private void logInfo(final NodeHandler handler, final String mainMessage,
final @Nullable Consumer<ToStringBuilder> messageCustomizer) {
log(handler, Status.INFO, mainMessage, messageCustomizer, null);
private void logWarning(final NodeHandler handler, final String mainMessage,
final @Nullable Consumer<ToStringBuilder> messageCustomizer,
final @Nullable Throwable e) {
log(handler, Status.WARNING, mainMessage, messageCustomizer, e);
private void logWarning(final NodeHandler handler, final String mainMessage,
final @Nullable Throwable e) {
log(handler, Status.WARNING, mainMessage, null, e);
private void logWarning(final NodeHandler handler, final String mainMessage) {
log(handler, Status.WARNING, mainMessage, null, null);
private void logError(final NodeHandler handler, final String mainMessage,
final @Nullable Throwable e) {
log(handler, Status.ERROR, mainMessage, null, e);