| /*=============================================================================# |
| # Copyright (c) 2009, 2017 Stephan Wahlbrink and others. |
| # |
| # This program and the accompanying materials are made available under the |
| # terms of the Eclipse Public License 2.0 which is available at |
| # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 |
| # which is available at https://www.apache.org/licenses/LICENSE-2.0. |
| # |
| # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 |
| # |
| # Contributors: |
| # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation |
| #=============================================================================*/ |
| |
| package org.eclipse.statet.internal.rj.servi.server; |
| |
| import java.io.File; |
| import java.io.PrintStream; |
| import java.rmi.ConnectException; |
| import java.rmi.RemoteException; |
| import java.rmi.server.RMIClientSocketFactory; |
| import java.rmi.server.RMIServerSocketFactory; |
| import java.rmi.server.RemoteServer; |
| import java.rmi.server.ServerNotActiveException; |
| import java.rmi.server.UnicastRemoteObject; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.logging.Level; |
| |
| import javax.rmi.ssl.SslRMIClientSocketFactory; |
| import javax.rmi.ssl.SslRMIServerSocketFactory; |
| import javax.security.auth.login.LoginException; |
| |
| import org.eclipse.statet.rj.RjException; |
| import org.eclipse.statet.rj.server.DataCmdItem; |
| import org.eclipse.statet.rj.server.MainCmdC2SList; |
| import org.eclipse.statet.rj.server.MainCmdItem; |
| import org.eclipse.statet.rj.server.MainCmdS2CList; |
| import org.eclipse.statet.rj.server.RjsComConfig; |
| import org.eclipse.statet.rj.server.RjsComObject; |
| import org.eclipse.statet.rj.server.RjsStatus; |
| import org.eclipse.statet.rj.server.Server; |
| import org.eclipse.statet.rj.server.ServerLogin; |
| import org.eclipse.statet.rj.server.srv.RMIServerControl; |
| import org.eclipse.statet.rj.server.srv.engine.SrvEngineServer; |
| import org.eclipse.statet.rj.server.srvext.Client; |
| import org.eclipse.statet.rj.server.srvext.ServerAuthMethod; |
| import org.eclipse.statet.rj.server.srvext.ServerRuntimePlugin; |
| import org.eclipse.statet.rj.server.srvext.auth.NoAuthMethod; |
| import org.eclipse.statet.rj.server.util.ServerUtils; |
| import org.eclipse.statet.rj.servi.node.RServiNode; |
| |
| |
| public class NodeServer extends SrvEngineServer { |
| |
| |
| class ConsoleDummy extends Thread { |
| |
| private final Client client; |
| private final PrintStream out; |
| |
| private final MainCmdC2SList c2sList= new MainCmdC2SList(); |
| |
| ConsoleDummy(final Client client) { |
| setName("R Console"); |
| setDaemon(true); |
| setPriority(NORM_PRIORITY-1); |
| this.client= client; |
| this.out= System.out; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| synchronized (NodeServer.this.srvEngine) { |
| if (NodeServer.this.isConsoleEnabled || NodeServer.this.isConsoleDummyRunning) { |
| return; |
| } |
| NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>()); |
| NodeServer.this.isConsoleDummyRunning= true; |
| } |
| |
| RjsComObject sendCom= null; |
| boolean error= false; |
| while (true) { |
| try { |
| if (sendCom == null) { |
| this.c2sList.setObjects(null); |
| sendCom= this.c2sList; |
| } |
| final RjsComObject receivedCom= NodeServer.this.srvEngine.runMainLoop(this.client, sendCom); |
| sendCom= null; |
| error= false; |
| if (receivedCom != null) { |
| switch (receivedCom.getComType()) { |
| case RjsComObject.T_PING: |
| sendCom= RjsStatus.OK_STATUS; |
| break; |
| case RjsComObject.T_MAIN_LIST: |
| MainCmdItem item= ((MainCmdS2CList) receivedCom).getItems(); |
| MainCmdItem tmp; |
| ITER_ITEMS : for (; (item != null); tmp= item, item= item.next, tmp.next= null) { |
| switch (item.getCmdType()) { |
| case MainCmdItem.T_CONSOLE_WRITE_ITEM: |
| this.out.println("R-OUT (" + item.getOp() + "): " + item.getDataText()); |
| break; |
| case MainCmdItem.T_CONSOLE_READ_ITEM: |
| this.out.println("R-PROMPT: " + item.getDataText()); |
| break; |
| } |
| } |
| break; |
| case RjsComObject.T_STATUS: |
| switch (((RjsStatus) receivedCom).getCode()) { |
| case Server.S_DISCONNECTED: |
| throw new ConnectException(""); |
| case Server.S_LOST: |
| case Server.S_NOT_STARTED: |
| case Server.S_STOPPED: |
| return; |
| } |
| } |
| } |
| } |
| catch (final ConnectException e) { |
| synchronized (NodeServer.this.srvEngine) { |
| if (NodeServer.this.isConsoleEnabled) { |
| NodeServer.this.isConsoleDummyRunning= false; |
| return; |
| } |
| NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>()); |
| } |
| } |
| catch (final Exception e) { |
| if (error) { |
| throw e; |
| } |
| LOGGER.log(Level.SEVERE, "An error occurred when running dummy R REPL. Trying to continue REPL.", e); |
| error= true; |
| } |
| if (sendCom == null) { |
| try { |
| sleep(5000); |
| } |
| catch (final InterruptedException e) { |
| } |
| } |
| } |
| } |
| catch (final Exception e) { |
| LOGGER.log(Level.SEVERE, "An error occurred when running dummy R REPL. Stopping REPL.", e); |
| } |
| } |
| |
| } |
| |
| class Node implements RServiNode { |
| |
| @Override |
| public boolean setConsole(final String authConfig) throws RemoteException, RjException { |
| final boolean enabled; |
| synchronized (NodeServer.this.srvEngine) { |
| // LOGGER.fine("enter lock"); |
| final Client currentClient= NodeServer.this.srvEngine.getCurrentClient(); |
| if (currentClient != null) { |
| NodeServer.this.srvEngine.disconnect(currentClient); |
| } |
| // LOGGER.fine("disconnect"); |
| if (authConfig != null) { |
| NodeServer.this.authMethod= NodeServer.this.control.createServerAuth(authConfig); |
| enabled= NodeServer.this.isConsoleEnabled= true; |
| } |
| else { |
| NodeServer.this.authMethod= new NoAuthMethod("<internal>"); |
| enabled= NodeServer.this.isConsoleEnabled= false; |
| // LOGGER.fine("before start"); |
| if (!NodeServer.this.isConsoleDummyRunning) { |
| new ConsoleDummy(NodeServer.this.consoleDummyClient).start(); |
| } |
| // LOGGER.fine("after start"); |
| } |
| } |
| return enabled; |
| } |
| |
| @Override |
| public int getEvalTime() throws RemoteException { |
| return 0; |
| } |
| |
| @Override |
| public void ping() throws RemoteException { |
| } |
| |
| @Override |
| public String getPoolHost() throws RemoteException { |
| try { |
| return RemoteServer.getClientHost(); |
| } |
| catch (final ServerNotActiveException e) { |
| return "<internal>"; |
| } |
| } |
| |
| @Override |
| public void runSnippet(final String code) throws RemoteException, RjException { |
| runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| code, null, null, null )); |
| } |
| |
| @Override |
| public RServiBackend bindClient(final String client) throws RemoteException { |
| return NodeServer.this.bindClient(client); |
| } |
| |
| @Override |
| public void unbindClient() throws RemoteException { |
| NodeServer.this.unbindClient(); |
| } |
| |
| @Override |
| public void shutdown() throws RemoteException { |
| NodeServer.this.shutdown(); |
| } |
| |
| } |
| |
| class Backend implements RServiBackend { |
| |
| @Override |
| public Server getPublic() throws RemoteException { |
| return null; |
| } |
| |
| @Override |
| public Map<String, Object> getPlatformData() { |
| return NodeServer.this.srvEngine.getPlatformData(); |
| } |
| |
| @Override |
| public void setProperties(final Map<String, ? extends Object> properties) throws RemoteException { |
| NodeServer.this.setProperties(properties, this); |
| } |
| |
| public boolean interrupt() throws RemoteException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void disconnect() throws RemoteException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public RjsComObject runMainLoop(final RjsComObject com) throws RemoteException { |
| return NodeServer.this.runMainLoop(com, this); |
| } |
| |
| @Override |
| public RjsComObject runAsync(final RjsComObject com) throws RemoteException { |
| return NodeServer.this.runAsync(com, this); |
| } |
| |
| @Override |
| public boolean isClosed() throws RemoteException { |
| return (NodeServer.this.currentClientBackend != this); |
| } |
| |
| } |
| |
| |
| private boolean isConsoleEnabled; |
| private boolean isConsoleDummyRunning; |
| |
| private final ServerAuthMethod rserviAuthMethod; |
| |
| private final Client consoleDummyClient; |
| |
| private String currentClientId; |
| private Backend currentClientBackend; |
| private RServiBackend currentClientExp; |
| |
| private final Object serviRunLock= new Object(); |
| |
| private String resetCommand; |
| |
| private RMIClientSocketFactory clientSocketFactory; |
| private RMIServerSocketFactory serverSocketFactory; |
| |
| |
| public NodeServer(final RMIServerControl control) { |
| super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$ |
| this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$ |
| this.consoleDummyClient= new Client("-", "dummy", (byte) 0); //$NON-NLS-1$ |
| |
| if (control.getOptions().containsKey("ssl")) { //$NON-NLS-1$ |
| this.clientSocketFactory= new SslRMIClientSocketFactory(); |
| this.serverSocketFactory= new SslRMIServerSocketFactory(null, null, true); |
| } |
| } |
| |
| |
| @Override |
| public boolean getConfigUnbindOnStartup() { |
| return false; |
| } |
| |
| |
| @Override |
| public void start(final ServerRuntimePlugin runtimePlugin) throws Exception { |
| super.start(runtimePlugin); |
| |
| this.resetCommand= "{" + |
| "rm(list=ls());" + |
| "gc();" + |
| ".rj.getTmp<-function(o){x<-get(o,pos=.GlobalEnv);rm(list=o,pos=.GlobalEnv);x};" + |
| ".rj.wd<-\""+this.workingDirectory.replace("\\", "\\\\")+"\";" + |
| "setwd(.rj.wd);" + |
| "graphics.off();" + |
| "}"; |
| RjsComConfig.setServerPathResolver(this); |
| |
| final Map<String, Object> properties= new HashMap<>(); |
| properties.put("args", new String[0]); |
| this.srvEngine.start(this.consoleDummyClient, properties); |
| |
| try { |
| synchronized (this.serviRunLock) { |
| LOGGER.log(Level.FINE, "Initializing R node: Loading R package 'rj'..."); |
| runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| "library(rj)", null, null, null )); |
| LOGGER.log(Level.FINE, "Initializing R node: Preparing R workspace for first client..."); |
| runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| this.resetCommand, null, null, null )); |
| } |
| } |
| catch (final Exception e) { |
| throw new RjException("An error occurred while preparing initially the workspace.", e); |
| } |
| |
| LOGGER.log(Level.FINE, "Initializing R node: R engine started and initialized."); |
| } |
| |
| @Override |
| protected ServerAuthMethod getAuthMethod(final String command) { |
| if (command.startsWith("rservi")) { |
| return this.rserviAuthMethod; |
| } |
| return super.getAuthMethod(command); |
| } |
| |
| @Override |
| public Object execute(final String command, final Map<String, ? extends Object> properties, final ServerLogin login) throws RemoteException, LoginException { |
| if (command.equals(C_CONSOLE_START)) { |
| throw new UnsupportedOperationException(); |
| } |
| if (command.equals(C_CONSOLE_CONNECT)) { |
| synchronized (this.srvEngine) { |
| if (!this.isConsoleEnabled) { |
| throw new RemoteException("Console is not enabled."); |
| } |
| final Client client= connectClient(command, login); |
| return this.srvEngine.connect(client, properties); |
| } |
| } |
| if (command.equals(C_RSERVI_NODECONTROL)) { |
| final Client client= connectClient(command, login); |
| final Node node= new Node(); |
| final RServiNode exported= (RServiNode) UnicastRemoteObject.exportObject(node, 0, |
| this.clientSocketFactory, this.serverSocketFactory ); |
| return exported; |
| } |
| throw new UnsupportedOperationException(); |
| } |
| |
| |
| RServiBackend bindClient(final String client) throws RemoteException { |
| synchronized (this.serverClient) { |
| if (NodeServer.this.currentClientBackend != null) { |
| throw new IllegalStateException(); |
| } |
| final Backend backend= new Backend(); |
| final RServiBackend export= (RServiBackend) UnicastRemoteObject.exportObject(backend, 0, |
| this.clientSocketFactory, this.serverSocketFactory ); |
| this.currentClientId= client; |
| this.currentClientBackend= backend; |
| this.currentClientExp= export; |
| SrvEngineServer.addClient(export); |
| return export; |
| } |
| } |
| |
| void unbindClient() throws RemoteException { |
| synchronized (this.serverClient) { |
| final Backend previous= this.currentClientBackend; |
| if (previous != null) { |
| SrvEngineServer.removeClient(this.currentClientExp); |
| this.currentClientId= null; |
| this.currentClientBackend= null; |
| this.currentClientExp= null; |
| UnicastRemoteObject.unexportObject(previous, true); |
| try { |
| synchronized (this.serviRunLock) { |
| runServerLoopCommand(null, new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| this.resetCommand, null, null, null )); |
| ServerUtils.cleanDir(new File(this.workingDirectory), "out.log"); |
| } |
| } |
| catch (final Exception e) { |
| throw new RemoteException("An error occurred while resetting the workspace.", e); |
| } |
| } |
| } |
| } |
| |
| void shutdown() { |
| this.control.checkCleanup(); |
| new Timer(true).schedule(new TimerTask() { |
| @Override |
| public void run() { |
| try { |
| unbindClient(); |
| } |
| catch (final Exception e) { |
| e.printStackTrace(); |
| } |
| System.exit(0); |
| } |
| }, 500L); |
| } |
| |
| public void setProperties(final Map<String, ? extends Object> properties, final Object caller) throws RemoteException { |
| synchronized (this.serviRunLock) { |
| if (caller != null && this.currentClientBackend != caller) { |
| throw new IllegalAccessError(); |
| } |
| this.srvEngine.setProperties(this.serverClient, properties); |
| } |
| } |
| |
| @Override |
| protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException { |
| synchronized (this.serviRunLock) { |
| if (caller != null && this.currentClientBackend != caller) { |
| throw new IllegalAccessError(); |
| } |
| } |
| return super.runMainLoop(com, caller); |
| } |
| |
| private RjsComObject runAsync(final RjsComObject com, final Backend backend) throws RemoteException { |
| if (backend != null && this.currentClientBackend != backend) { |
| throw new IllegalAccessError(); |
| } |
| return this.srvEngine.runAsync(this.serverClient, com); |
| } |
| |
| } |