blob: 1c51343a6f4c565d41135db9dea678b92240271a [file] [log] [blame]
/*=============================================================================#
# Copyright (c) 2009, 2017 Stephan Wahlbrink and others.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
package org.eclipse.statet.internal.rj.servi.server;
import java.io.File;
import java.io.PrintStream;
import java.rmi.ConnectException;
import java.rmi.RemoteException;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMIServerSocketFactory;
import java.rmi.server.RemoteServer;
import java.rmi.server.ServerNotActiveException;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import javax.rmi.ssl.SslRMIServerSocketFactory;
import javax.security.auth.login.LoginException;
import org.eclipse.statet.rj.RjException;
import org.eclipse.statet.rj.server.DataCmdItem;
import org.eclipse.statet.rj.server.MainCmdC2SList;
import org.eclipse.statet.rj.server.MainCmdItem;
import org.eclipse.statet.rj.server.MainCmdS2CList;
import org.eclipse.statet.rj.server.RjsComConfig;
import org.eclipse.statet.rj.server.RjsComObject;
import org.eclipse.statet.rj.server.RjsStatus;
import org.eclipse.statet.rj.server.Server;
import org.eclipse.statet.rj.server.ServerLogin;
import org.eclipse.statet.rj.server.srv.RMIServerControl;
import org.eclipse.statet.rj.server.srv.engine.SrvEngineServer;
import org.eclipse.statet.rj.server.srvext.Client;
import org.eclipse.statet.rj.server.srvext.ServerAuthMethod;
import org.eclipse.statet.rj.server.srvext.ServerRuntimePlugin;
import org.eclipse.statet.rj.server.srvext.auth.NoAuthMethod;
import org.eclipse.statet.rj.server.util.ServerUtils;
import org.eclipse.statet.rj.servi.node.RServiNode;
public class NodeServer extends SrvEngineServer {
class ConsoleDummy extends Thread {
private final Client client;
private final PrintStream out;
private final MainCmdC2SList c2sList= new MainCmdC2SList();
ConsoleDummy(final Client client) {
setName("R Console");
setDaemon(true);
setPriority(NORM_PRIORITY-1);
this.client= client;
this.out= System.out;
}
@Override
public void run() {
try {
synchronized (NodeServer.this.srvEngine) {
if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) {
return;
}
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
NodeServer.this.isConsoleDummyRunning= true;
}
RjsComObject sendCom= null;
boolean error= false;
while (true) {
try {
if (sendCom == null) {
this.c2sList.setObjects(null);
sendCom= this.c2sList;
}
final RjsComObject receivedCom= NodeServer.this.srvEngine.runMainLoop(this.client, sendCom);
sendCom= null;
error= false;
if (receivedCom != null) {
switch (receivedCom.getComType()) {
case RjsComObject.T_PING:
sendCom= RjsStatus.OK_STATUS;
break;
case RjsComObject.T_MAIN_LIST:
MainCmdItem item= ((MainCmdS2CList) receivedCom).getItems();
MainCmdItem tmp;
ITER_ITEMS : for (; (item != null); tmp= item, item= item.next, tmp.next= null) {
switch (item.getCmdType()) {
case MainCmdItem.T_CONSOLE_WRITE_ITEM:
this.out.println("R-OUT (" + item.getOp() + "): " + item.getDataText());
break;
case MainCmdItem.T_CONSOLE_READ_ITEM:
this.out.println("R-PROMPT: " + item.getDataText());
break;
}
}
break;
case RjsComObject.T_STATUS:
switch (((RjsStatus) receivedCom).getCode()) {
case Server.S_DISCONNECTED:
throw new ConnectException("");
case Server.S_LOST:
case Server.S_NOT_STARTED:
case Server.S_STOPPED:
return;
}
}
}
}
catch (final ConnectException e) {
synchronized (NodeServer.this.srvEngine) {
if (NodeServer.this.isConsoleEnabled) {
NodeServer.this.isConsoleDummyRunning= false;
return;
}
NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>());
}
}
catch (final Exception e) {
if (error) {
throw e;
}
LOGGER.log(Level.SEVERE, "An error occurred when running dummy R REPL. Trying to continue REPL.", e);
error= true;
}
if (sendCom == null) {
try {
sleep(5000);
}
catch (final InterruptedException e) {
}
}
}
}
catch (final Exception e) {
LOGGER.log(Level.SEVERE, "An error occurred when running dummy R REPL. Stopping REPL.", e);
}
}
}
class Node implements RServiNode {
@Override
public boolean setConsole(final String authConfig) throws RemoteException, RjException {
final boolean enabled;
synchronized (NodeServer.this.srvEngine) {
// LOGGER.fine("enter lock");
final Client currentClient= NodeServer.this.srvEngine.getCurrentClient();
if (currentClient != null) {
NodeServer.this.srvEngine.disconnect(currentClient);
}
// LOGGER.fine("disconnect");
if (authConfig != null) {
NodeServer.this.authMethod= NodeServer.this.control.createServerAuth(authConfig);
enabled= NodeServer.this.isConsoleEnabled= true;
}
else {
NodeServer.this.authMethod= new NoAuthMethod("<internal>");
enabled= NodeServer.this.isConsoleEnabled= false;
// LOGGER.fine("before start");
if (!NodeServer.this.isConsoleDummyRunning) {
new ConsoleDummy(NodeServer.this.consoleDummyClient).start();
}
// LOGGER.fine("after start");
}
}
return enabled;
}
@Override
public int getEvalTime() throws RemoteException {
return 0;
}
@Override
public void ping() throws RemoteException {
}
@Override
public String getPoolHost() throws RemoteException {
try {
return RemoteServer.getClientHost();
}
catch (final ServerNotActiveException e) {
return "<internal>";
}
}
@Override
public void runSnippet(final String code) throws RemoteException, RjException {
runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
code, null, null, null ));
}
@Override
public RServiBackend bindClient(final String client) throws RemoteException {
return NodeServer.this.bindClient(client);
}
@Override
public void unbindClient() throws RemoteException {
NodeServer.this.unbindClient();
}
@Override
public void shutdown() throws RemoteException {
NodeServer.this.shutdown();
}
}
class Backend implements RServiBackend {
@Override
public Server getPublic() throws RemoteException {
return null;
}
@Override
public Map<String, Object> getPlatformData() {
return NodeServer.this.srvEngine.getPlatformData();
}
@Override
public void setProperties(final Map<String, ? extends Object> properties) throws RemoteException {
NodeServer.this.setProperties(properties, this);
}
public boolean interrupt() throws RemoteException {
throw new UnsupportedOperationException();
}
@Override
public void disconnect() throws RemoteException {
throw new UnsupportedOperationException();
}
@Override
public RjsComObject runMainLoop(final RjsComObject com) throws RemoteException {
return NodeServer.this.runMainLoop(com, this);
}
@Override
public RjsComObject runAsync(final RjsComObject com) throws RemoteException {
return NodeServer.this.runAsync(com, this);
}
@Override
public boolean isClosed() throws RemoteException {
return (NodeServer.this.currentClientBackend != this);
}
}
private boolean isConsoleEnabled;
private boolean isConsoleDummyRunning;
private final ServerAuthMethod rserviAuthMethod;
private final Client consoleDummyClient;
private String currentClientId;
private Backend currentClientBackend;
private RServiBackend currentClientExp;
private final Object serviRunLock= new Object();
private String resetCommand;
private RMIClientSocketFactory clientSocketFactory;
private RMIServerSocketFactory serverSocketFactory;
public NodeServer(final RMIServerControl control) {
super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$
this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$
this.consoleDummyClient= new Client("-", "dummy", (byte) 0); //$NON-NLS-1$
if (control.getOptions().containsKey("ssl")) { //$NON-NLS-1$
this.clientSocketFactory= new SslRMIClientSocketFactory();
this.serverSocketFactory= new SslRMIServerSocketFactory(null, null, true);
}
}
@Override
public boolean getConfigUnbindOnStartup() {
return false;
}
@Override
public void start(final ServerRuntimePlugin runtimePlugin) throws Exception {
super.start(runtimePlugin);
this.resetCommand= "{" +
"rm(list=ls());" +
"gc();" +
".rj.getTmp<-function(o){x<-get(o,pos=.GlobalEnv);rm(list=o,pos=.GlobalEnv);x};" +
".rj.wd<-\""+this.workingDirectory.replace("\\", "\\\\")+"\";" +
"setwd(.rj.wd);" +
"graphics.off();" +
"}";
RjsComConfig.setServerPathResolver(this);
final Map<String, Object> properties= new HashMap<>();
properties.put("args", new String[0]);
this.srvEngine.start(this.consoleDummyClient, properties);
try {
synchronized (this.serviRunLock) {
LOGGER.log(Level.FINE, "Initializing R node: Loading R package 'rj'...");
runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
"library(rj)", null, null, null ));
LOGGER.log(Level.FINE, "Initializing R node: Preparing R workspace for first client...");
runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
this.resetCommand, null, null, null ));
}
}
catch (final Exception e) {
throw new RjException("An error occurred while preparing initially the workspace.", e);
}
LOGGER.log(Level.FINE, "Initializing R node: R engine started and initialized.");
}
@Override
protected ServerAuthMethod getAuthMethod(final String command) {
if (command.startsWith("rservi")) {
return this.rserviAuthMethod;
}
return super.getAuthMethod(command);
}
@Override
public Object execute(final String command, final Map<String, ? extends Object> properties, final ServerLogin login) throws RemoteException, LoginException {
if (command.equals(C_CONSOLE_START)) {
throw new UnsupportedOperationException();
}
if (command.equals(C_CONSOLE_CONNECT)) {
synchronized (this.srvEngine) {
if (!this.isConsoleEnabled) {
throw new RemoteException("Console is not enabled.");
}
final Client client= connectClient(command, login);
return this.srvEngine.connect(client, properties);
}
}
if (command.equals(C_RSERVI_NODECONTROL)) {
final Client client= connectClient(command, login);
final Node node= new Node();
final RServiNode exported= (RServiNode) UnicastRemoteObject.exportObject(node, 0,
this.clientSocketFactory, this.serverSocketFactory );
return exported;
}
throw new UnsupportedOperationException();
}
RServiBackend bindClient(final String client) throws RemoteException {
synchronized (this.serverClient) {
if (NodeServer.this.currentClientBackend != null) {
throw new IllegalStateException();
}
final Backend backend= new Backend();
final RServiBackend export= (RServiBackend) UnicastRemoteObject.exportObject(backend, 0,
this.clientSocketFactory, this.serverSocketFactory );
this.currentClientId= client;
this.currentClientBackend= backend;
this.currentClientExp= export;
SrvEngineServer.addClient(export);
return export;
}
}
void unbindClient() throws RemoteException {
synchronized (this.serverClient) {
final Backend previous= this.currentClientBackend;
if (previous != null) {
SrvEngineServer.removeClient(this.currentClientExp);
this.currentClientId= null;
this.currentClientBackend= null;
this.currentClientExp= null;
UnicastRemoteObject.unexportObject(previous, true);
try {
synchronized (this.serviRunLock) {
runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0,
this.resetCommand, null, null, null ));
ServerUtils.cleanDir(new File(this.workingDirectory), "out.log");
}
}
catch (final Exception e) {
throw new RemoteException("An error occurred while resetting the workspace.", e);
}
}
}
}
void shutdown() {
this.control.checkCleanup();
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
try {
unbindClient();
}
catch (final Exception e) {
e.printStackTrace();
}
System.exit(0);
}
}, 500L);
}
public void setProperties(final Map<String, ? extends Object> properties, final Object caller) throws RemoteException {
synchronized (this.serviRunLock) {
if (caller != null && this.currentClientBackend != caller) {
throw new IllegalAccessError();
}
this.srvEngine.setProperties(this.serverClient, properties);
}
}
@Override
protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException {
synchronized (this.serviRunLock) {
if (caller != null && this.currentClientBackend != caller) {
throw new IllegalAccessError();
}
}
return super.runMainLoop(com, caller);
}
private RjsComObject runAsync(final RjsComObject com, final Backend backend) throws RemoteException {
if (backend != null && this.currentClientBackend != backend) {
throw new IllegalAccessError();
}
return this.srvEngine.runAsync(this.serverClient, com);
}
}