| /*=============================================================================# |
| # Copyright (c) 2009, 2020 Stephan Wahlbrink and others. |
| # |
| # This program and the accompanying materials are made available under the |
| # terms of the Eclipse Public License 2.0 which is available at |
| # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 |
| # which is available at https://www.apache.org/licenses/LICENSE-2.0. |
| # |
| # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 |
| # |
| # Contributors: |
| # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation |
| #=============================================================================*/ |
| |
| package org.eclipse.statet.internal.rj.servi.server; |
| |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.Path; |
| import java.rmi.ConnectException; |
| import java.rmi.RemoteException; |
| import java.rmi.server.RemoteServer; |
| import java.rmi.server.ServerNotActiveException; |
| import java.rmi.server.UnicastRemoteObject; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.logging.Level; |
| |
| import javax.security.auth.login.LoginException; |
| |
| import org.eclipse.statet.jcommons.io.FileUtils; |
| import org.eclipse.statet.jcommons.lang.NonNull; |
| import org.eclipse.statet.jcommons.lang.NonNullByDefault; |
| import org.eclipse.statet.jcommons.lang.Nullable; |
| |
| import org.eclipse.statet.rj.RjException; |
| import org.eclipse.statet.rj.server.DataCmdItem; |
| import org.eclipse.statet.rj.server.MainCmdC2SList; |
| import org.eclipse.statet.rj.server.MainCmdItem; |
| import org.eclipse.statet.rj.server.MainCmdS2CList; |
| import org.eclipse.statet.rj.server.RjsComConfig; |
| import org.eclipse.statet.rj.server.RjsComObject; |
| import org.eclipse.statet.rj.server.RjsStatus; |
| import org.eclipse.statet.rj.server.Server; |
| import org.eclipse.statet.rj.server.ServerLogin; |
| import org.eclipse.statet.rj.server.srv.RMIServerControl; |
| import org.eclipse.statet.rj.server.srv.engine.SrvEngineServer; |
| import org.eclipse.statet.rj.server.srvext.Client; |
| import org.eclipse.statet.rj.server.srvext.ServerAuthMethod; |
| import org.eclipse.statet.rj.server.srvext.ServerRuntimePlugin; |
| import org.eclipse.statet.rj.server.srvext.auth.NoAuthMethod; |
| import org.eclipse.statet.rj.servi.node.RServiNode; |
| |
| |
| @NonNullByDefault |
| public class NodeServer extends SrvEngineServer { |
| |
| |
| class ConsoleMockup extends Thread { |
| |
| private final Client client; |
| |
| private final MainCmdC2SList c2sList= new MainCmdC2SList(); |
| |
| private int waitMillis= 5000; |
| |
| ConsoleMockup() { |
| setName("R Console"); |
| setDaemon(true); |
| setPriority(NORM_PRIORITY-1); |
| this.client= NodeServer.this.consoleMockupClient; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| synchronized (NodeServer.this.srvEngine) { |
| if (NodeServer.this.isConsoleEnabled || NodeServer.this.consoleMockup != null) { |
| return; |
| } |
| NodeServer.this.consoleMockup= this; |
| NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>()); |
| } |
| |
| RjsComObject sendCom= null; |
| boolean error= false; |
| while (true) { |
| try { |
| if (sendCom == null) { |
| this.c2sList.setObjects(null); |
| sendCom= this.c2sList; |
| } |
| final RjsComObject receivedCom= NodeServer.this.srvEngine.runMainLoop(this.client, sendCom); |
| sendCom= null; |
| error= false; |
| if (receivedCom != null) { |
| switch (receivedCom.getComType()) { |
| case RjsComObject.T_PING: |
| sendCom= RjsStatus.OK_STATUS; |
| break; |
| case RjsComObject.T_MAIN_LIST: |
| MainCmdItem item= ((MainCmdS2CList) receivedCom).getItems(); |
| MainCmdItem tmp; |
| for (; (item != null); tmp= item, item= item.next, tmp.next= null) { |
| processServerCmdItem(item); |
| } |
| break; |
| case RjsComObject.T_STATUS: |
| switch (((RjsStatus)receivedCom).getCode()) { |
| case Server.S_DISCONNECTED: |
| throw new ConnectException(""); |
| case Server.S_LOST: |
| case Server.S_NOT_STARTED: |
| case Server.S_STOPPED: |
| synchronized (NodeServer.this.srvEngine) { |
| if (NodeServer.this.consoleMockup == this) { |
| NodeServer.this.consoleMockup= null; |
| } |
| return; |
| } |
| } |
| } |
| } |
| } |
| catch (final ConnectException e) { |
| synchronized (NodeServer.this.srvEngine) { |
| if (NodeServer.this.isConsoleEnabled) { |
| if (NodeServer.this.consoleMockup == this) { |
| NodeServer.this.consoleMockup= null; |
| } |
| return; |
| } |
| NodeServer.this.srvEngine.connect(this.client, new HashMap<String, Object>()); |
| } |
| } |
| catch (final Exception e) { |
| if (error) { |
| throw e; |
| } |
| LOGGER.log(Level.SEVERE, |
| "An error occurred when running dummy R REPL. Trying to continue REPL.", |
| e ); |
| error= true; |
| } |
| if (sendCom == null) { |
| try { |
| sleep(this.waitMillis); |
| } |
| catch (final InterruptedException e) { |
| } |
| } |
| } |
| } |
| catch (final Throwable e) { |
| LOGGER.log(Level.SEVERE, |
| "An error occurred when running dummy R REPL. Stopping REPL.", |
| e ); |
| } |
| } |
| |
| public void aboutToStop() { |
| this.waitMillis= 100; |
| interrupt(); |
| } |
| |
| } |
| |
| class Node implements RServiNode { |
| |
| @Override |
| public boolean setConsole(final @Nullable String authConfig) throws RemoteException, RjException { |
| final boolean enabled; |
| synchronized (NodeServer.this.srvEngine) { |
| // LOGGER.fine("enter lock"); |
| final Client currentClient= NodeServer.this.srvEngine.getCurrentClient(); |
| if (currentClient != null) { |
| NodeServer.this.srvEngine.disconnect(currentClient); |
| } |
| // LOGGER.fine("disconnect"); |
| if (authConfig != null) { |
| NodeServer.this.authMethod= NodeServer.this.control.createServerAuth(authConfig); |
| enabled= NodeServer.this.isConsoleEnabled= true; |
| } |
| else { |
| NodeServer.this.authMethod= new NoAuthMethod("<internal>"); |
| enabled= NodeServer.this.isConsoleEnabled= false; |
| // LOGGER.fine("before start"); |
| if (NodeServer.this.consoleMockup == null) { |
| new ConsoleMockup().start(); |
| } |
| // LOGGER.fine("after start"); |
| } |
| } |
| return enabled; |
| } |
| |
| @Override |
| public int getEvalTime() throws RemoteException { |
| return 0; |
| } |
| |
| @Override |
| public void ping() throws RemoteException { |
| } |
| |
| @Override |
| public String getPoolHost() throws RemoteException { |
| try { |
| return RemoteServer.getClientHost(); |
| } |
| catch (final ServerNotActiveException e) { |
| return "<internal>"; |
| } |
| } |
| |
| @Override |
| public void runSnippet(final String code) throws RemoteException, RjException { |
| runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| code, null, null, null )); |
| } |
| |
| @Override |
| public RServiBackend bindClient(final String client) throws RemoteException { |
| return NodeServer.this.bindClient(client); |
| } |
| |
| @Override |
| public void unbindClient() throws RemoteException { |
| NodeServer.this.unbindClient(); |
| } |
| |
| @Override |
| public void shutdown() throws RemoteException { |
| NodeServer.this.shutdown(); |
| } |
| |
| } |
| |
| class Backend implements RServiBackend { |
| |
| @Override |
| public @Nullable Server getPublic() throws RemoteException { |
| return null; |
| } |
| |
| @Override |
| public Map<String, Object> getPlatformData() { |
| return NodeServer.this.srvEngine.getPlatformData(); |
| } |
| |
| @Override |
| public void setProperties(final Map<String, ? extends @NonNull Object> properties) throws RemoteException { |
| NodeServer.this.setProperties(properties, this); |
| } |
| |
| public boolean interrupt() throws RemoteException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void disconnect() throws RemoteException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public RjsComObject runMainLoop(final RjsComObject com) throws RemoteException { |
| return NodeServer.this.runMainLoop(com, this); |
| } |
| |
| @Override |
| public RjsComObject runAsync(final RjsComObject com) throws RemoteException { |
| return NodeServer.this.runAsync(com, this); |
| } |
| |
| @Override |
| public boolean isClosed() throws RemoteException { |
| return (NodeServer.this.currentClientBackend != this); |
| } |
| |
| } |
| |
| |
| private boolean isConsoleEnabled; |
| |
| private final ServerAuthMethod rserviAuthMethod; |
| |
| private final Client consoleMockupClient; |
| private @Nullable ConsoleMockup consoleMockup; |
| |
| private @Nullable String currentClientLabel; |
| private @Nullable Backend currentClientBackend; |
| private @Nullable RServiBackend currentClientExp; |
| |
| private final Object serviRunLock= new Object(); |
| |
| private String resetCommand= ""; |
| |
| |
| public NodeServer(final RMIServerControl control) { |
| super(control, new NoAuthMethod("<internal>")); //$NON-NLS-1$ |
| this.rserviAuthMethod= new NoAuthMethod("<internal>"); //$NON-NLS-1$ |
| this.consoleMockupClient= new Client("-", "dummy", (byte)0); //$NON-NLS-1$ |
| } |
| |
| |
| @Override |
| public boolean getConfigUnbindOnStartup() { |
| return false; |
| } |
| |
| |
| @Override |
| public void start(final ServerRuntimePlugin runtimePlugin) throws Exception { |
| super.start(runtimePlugin); |
| |
| this.resetCommand= "{" + |
| "rm(list= ls(all.names= TRUE));" + |
| "gc();" + |
| ".rj.getTmp<-function(o){x<-get(o,pos=.GlobalEnv);rm(list=o,pos=.GlobalEnv);x};" + |
| ".rj.wd<-\"" + this.workingDirectory.toString().replace("\\", "\\\\") + "\";" + |
| "setwd(.rj.wd);" + |
| "graphics.off();" + |
| "}"; |
| RjsComConfig.setServerPathResolver(this); |
| |
| final Map<String, Object> properties= new HashMap<>(); |
| properties.put("args", new String[0]); |
| this.srvEngine.start(this.consoleMockupClient, properties); |
| |
| try { |
| synchronized (this.serviRunLock) { |
| LOGGER.log(Level.FINE, "Initializing R node: Loading R package 'rj'..."); |
| runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| "library(rj)", null, null, null )); |
| LOGGER.log(Level.FINE, "Initializing R node: Preparing R workspace for first client..."); |
| runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| this.resetCommand, null, null, null )); |
| } |
| } |
| catch (final Exception e) { |
| throw new RjException("An error occurred while preparing initially the workspace.", e); |
| } |
| |
| LOGGER.log(Level.FINE, "Initializing R node: R engine started and initialized."); |
| } |
| |
| @Override |
| protected ServerAuthMethod getAuthMethod(final String command) { |
| if (command.startsWith("rservi")) { |
| return this.rserviAuthMethod; |
| } |
| return super.getAuthMethod(command); |
| } |
| |
| @Override |
| public Object execute(final String command, final Map<String, ? extends @NonNull Object> args, |
| final ServerLogin login) throws RemoteException, LoginException { |
| if (command.equals(C_CONSOLE_START)) { |
| throw new UnsupportedOperationException(); |
| } |
| if (command.equals(C_CONSOLE_CONNECT)) { |
| synchronized (this.srvEngine) { |
| if (!this.isConsoleEnabled) { |
| throw new RemoteException("Console is not enabled."); |
| } |
| final Client client= connectClient(command, login); |
| return this.srvEngine.connect(client, args); |
| } |
| } |
| if (command.equals(C_RSERVI_NODECONTROL)) { |
| @SuppressWarnings("unused") |
| final Client client= connectClient(command, login); |
| final Node node= new Node(); |
| final RServiNode exported= (RServiNode)this.control.exportObject(node); |
| return exported; |
| } |
| throw new UnsupportedOperationException(); |
| } |
| |
| |
| RServiBackend bindClient(final String client) throws RemoteException { |
| synchronized (this.serverClient) { |
| if (NodeServer.this.currentClientBackend != null) { |
| throw new IllegalStateException(); |
| } |
| final Backend backend= new Backend(); |
| final RServiBackend export= (RServiBackend)this.control.exportObject(backend); |
| this.currentClientLabel= client; |
| this.currentClientBackend= backend; |
| this.currentClientExp= export; |
| SrvEngineServer.addClient(export); |
| return export; |
| } |
| } |
| |
| @SuppressWarnings("null") |
| private static final DirectoryStream.Filter<? super Path> CLEAN_WORKING_DIRECTORY_FILTER= |
| (final Path entry) -> !entry.getFileName().toString().equals("out.log"); |
| |
| void unbindClient() throws RemoteException { |
| synchronized (this.serverClient) { |
| final Backend previous= this.currentClientBackend; |
| if (previous != null) { |
| SrvEngineServer.removeClient(this.currentClientExp); |
| this.currentClientLabel= null; |
| this.currentClientBackend= null; |
| this.currentClientExp= null; |
| UnicastRemoteObject.unexportObject(previous, true); |
| try { |
| synchronized (this.serviRunLock) { |
| runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| this.resetCommand, null, null, null )); |
| FileUtils.cleanDirectory(this.workingDirectory, CLEAN_WORKING_DIRECTORY_FILTER); |
| } |
| } |
| catch (final Exception e) { |
| throw new RemoteException("An error occurred while resetting the workspace.", e); |
| } |
| } |
| } |
| } |
| |
| void shutdown() { |
| this.control.checkCleanup(); |
| |
| try { |
| LOGGER.log(Level.FINE, "Shutting down R node: Unbind client..."); |
| unbindClient(); |
| } |
| catch (final Exception e) { |
| LOGGER.log(Level.SEVERE, |
| "An error occurred when unbinding the client for shutdown.", |
| e ); |
| } |
| |
| final ConsoleMockup consoleMockup= this.consoleMockup; |
| if (consoleMockup != null) { |
| consoleMockup.aboutToStop(); |
| } |
| try { |
| LOGGER.log(Level.FINE, "Shutting down R node: Exit R engine..."); |
| synchronized (this.serviRunLock) { |
| runServerLoopCommand(new DataCmdItem(DataCmdItem.EVAL_EXPR_VOID, 0, |
| "base::q(\"no\")", null, null, null )); |
| } |
| } |
| catch (final RjExitException e) { |
| // expected |
| } |
| catch (final Exception e) { |
| LOGGER.log(Level.SEVERE, |
| "An error occurred when exiting the R engine for shutdown.", |
| e ); |
| } |
| } |
| |
| public void setProperties(final Map<String, ? extends @NonNull Object> properties, final Object caller) throws RemoteException { |
| synchronized (this.serviRunLock) { |
| if (caller != null && this.currentClientBackend != caller) { |
| throw new IllegalAccessError(); |
| } |
| this.srvEngine.setProperties(this.serverClient, properties); |
| } |
| } |
| |
| @Override |
| protected RjsComObject runMainLoop(final RjsComObject com, final Object caller) throws RemoteException { |
| synchronized (this.serviRunLock) { |
| if (caller != null && this.currentClientBackend != caller) { |
| throw new IllegalAccessError(); |
| } |
| } |
| return super.runMainLoop(com, caller); |
| } |
| |
| private RjsComObject runAsync(final RjsComObject com, final Backend backend) throws RemoteException { |
| if (backend != null && this.currentClientBackend != backend) { |
| throw new IllegalAccessError(); |
| } |
| return this.srvEngine.runAsync(this.serverClient, com); |
| } |
| |
| } |