| /*=============================================================================# |
| # Copyright (c) 2007, 2021 Stephan Wahlbrink and others. |
| # |
| # This program and the accompanying materials are made available under the |
| # terms of the Eclipse Public License 2.0 which is available at |
| # https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 |
| # which is available at https://www.apache.org/licenses/LICENSE-2.0. |
| # |
| # SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 |
| # |
| # Contributors: |
| # Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation |
| #=============================================================================*/ |
| |
| package org.eclipse.statet.r.nico.impl; |
| |
| import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_BLOCKING_EVENT_ID; |
| import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_RUNNABLE_DATA_KEY; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.eclipse.core.filesystem.EFS; |
| import org.eclipse.core.runtime.Path; |
| import org.eclipse.core.runtime.Platform; |
| import org.eclipse.debug.core.IStreamListener; |
| import org.eclipse.debug.core.model.IStreamMonitor; |
| |
| import org.eclipse.statet.jcommons.lang.NonNullByDefault; |
| import org.eclipse.statet.jcommons.lang.Nullable; |
| import org.eclipse.statet.jcommons.status.ErrorStatus; |
| import org.eclipse.statet.jcommons.status.ProgressMonitor; |
| import org.eclipse.statet.jcommons.status.Status; |
| import org.eclipse.statet.jcommons.status.StatusException; |
| import org.eclipse.statet.jcommons.ts.core.BasicToolCommandData; |
| import org.eclipse.statet.jcommons.ts.core.ToolCommandData; |
| import org.eclipse.statet.jcommons.ts.core.ToolCommandHandler; |
| import org.eclipse.statet.jcommons.ts.core.ToolRunnable; |
| import org.eclipse.statet.jcommons.ts.core.ToolService; |
| |
| import org.eclipse.statet.internal.r.console.core.RConsoleCorePlugin; |
| import org.eclipse.statet.internal.r.nico.RNicoMessages; |
| import org.eclipse.statet.nico.core.runtime.ConsoleService; |
| import org.eclipse.statet.nico.core.runtime.IRequireSynch; |
| import org.eclipse.statet.nico.core.runtime.Prompt; |
| import org.eclipse.statet.nico.core.runtime.SubmitType; |
| import org.eclipse.statet.nico.core.runtime.ToolStatus; |
| import org.eclipse.statet.nico.core.runtime.ToolStreamMonitor; |
| import org.eclipse.statet.nico.core.runtime.ToolStreamProxy; |
| import org.eclipse.statet.r.console.core.AbstractRController; |
| import org.eclipse.statet.r.console.core.RProcess; |
| import org.eclipse.statet.r.console.core.RWorkspace; |
| import org.eclipse.statet.r.core.RUtil; |
| |
| |
| /** |
| * Controller for RTerm. |
| */ |
| @NonNullByDefault |
| public class RTermController extends AbstractRController implements IRequireSynch { |
| |
| |
| private static final Pattern INT_OUTPUT_PATTERN= Pattern.compile("\\Q[1] \\E(\\d*)"); //$NON-NLS-1$ |
| private static final Pattern STRING_OUTPUT_PATTERN= Pattern.compile("\\Q[1] \"\\E((?:\\Q\\\"\\E|[^\"])*)\\\""); //$NON-NLS-1$ |
| |
| |
| private class ReadThread extends Thread { |
| |
| |
| private final InputStreamReader reader; |
| |
| volatile int hasNoOutput; |
| private final int SYNC_COUNT= 2; |
| private final int SYNC_MS= 33; |
| |
| final Lock streamLock= new ReentrantLock(); |
| |
| public ReadThread(final InputStream output) { |
| super("Rterm-Output Monitor"); //$NON-NLS-1$ |
| this.reader= new InputStreamReader(output, RTermController.this.charset); |
| } |
| |
| @Override |
| public void run() { |
| final ToolStreamProxy streams= getStreams(); |
| boolean locked= false; |
| try { |
| boolean canRead= false; |
| final char[] b= new char[1024]; |
| while (RTermController.this.process != null | (canRead= this.reader.ready())) { |
| if (canRead || this.hasNoOutput > this.SYNC_COUNT) { |
| if (!canRead && locked) { |
| this.streamLock.unlock(); |
| locked= false; |
| } |
| int n= this.reader.read(b); |
| if (n > 0) { |
| this.hasNoOutput= 0; |
| if (!locked) { |
| this.streamLock.lock(); |
| locked= true; |
| } |
| final String s= new String(b, 0, n); |
| streams.getOutputStreamMonitor().append(s, SubmitType.CONSOLE, 0); |
| n= s.length(); |
| if (n >= 2 && s.charAt(--n) == ' ' && (s.charAt(--n) == '>' || s.charAt(n) == '+')) { |
| this.hasNoOutput++; |
| final Thread thread= getControllerThread(); |
| if (thread != null) { |
| thread.interrupt(); |
| } |
| } |
| continue; |
| } |
| else if (n < 0) { |
| onRTerminated(); |
| return; |
| } |
| } |
| try { |
| Thread.sleep(this.SYNC_MS); |
| this.hasNoOutput++; |
| } |
| catch (final InterruptedException e) { |
| // continue directly |
| } |
| } |
| } |
| catch (final IOException e) { |
| onRTerminated(); |
| return; |
| } |
| finally { |
| if (locked) { |
| this.streamLock.unlock(); |
| locked= false; |
| } |
| try { |
| this.reader.close(); |
| } catch (final IOException e1) { |
| } |
| } |
| } |
| |
| private void onRTerminated() { |
| markAsTerminated(); |
| synchronized (getQueue()) { |
| getQueue().notifyAll(); |
| } |
| } |
| } |
| |
| |
| private final ProcessBuilder processConfig; |
| private final Charset charset; |
| private @Nullable Process process; |
| private OutputStreamWriter processInputWriter; |
| private ReadThread processOutputThread; |
| |
| |
| public RTermController(final RProcess process, final ProcessBuilder config, final Charset charset) { |
| super(process, null); |
| this.processConfig= config; |
| this.charset= charset; |
| |
| setWorksapceData(new RWorkspace(this, null, null) { |
| @Override |
| protected void refreshFromTool(final AbstractRController controller, final int options, |
| final ProgressMonitor m) throws StatusException { |
| if ((options & RWorkspace.REFRESH_COMPLETE) != 0 || (options & RWorkspace.REFRESH_AUTO) == 0) { |
| final StringBuilder output= readOutputLine("getwd()", m); //$NON-NLS-1$ |
| if (output != null) { |
| final Matcher matcher= STRING_OUTPUT_PATTERN.matcher(output); |
| if (matcher.find()) { |
| final String wd= matcher.group(1); |
| setWorkspaceDirL(EFS.getLocalFileSystem().getStore(new Path(wd))); |
| } |
| } |
| } |
| clearBriefedChanges(); |
| } |
| }); |
| setWorkspaceDirL(EFS.getLocalFileSystem().fromLocalFile(config.directory())); |
| initRunnableAdapterL(); |
| } |
| |
| @Override |
| protected ToolRunnable createStartRunnable() { |
| return new StartRunnable() { |
| @Override |
| public String getLabel() { |
| return RNicoMessages.Rterm_StartTask_label; |
| } |
| }; |
| } |
| |
| @Override |
| protected void startToolL(final ProgressMonitor m) throws StatusException { |
| OutputStream processInput= null; |
| try { |
| final List<Status> warnings= new ArrayList<>(); |
| |
| this.processConfig.redirectErrorStream(true); |
| final Process process= this.processConfig.start(); |
| this.process= process; |
| this.processOutputThread= new ReadThread(process.getInputStream()); |
| this.processOutputThread.start(); |
| processInput= process.getOutputStream(); |
| this.processInputWriter= new OutputStreamWriter(processInput, this.charset); |
| setCurrentPromptL(this.fDefaultPrompt); |
| |
| initTracks(m, warnings); |
| |
| if (!this.startupsRunnables.isEmpty()) { |
| getQueue().add(this.startupsRunnables); |
| this.startupsRunnables.clear(); |
| } |
| |
| scheduleControllerRunnable(new ControllerSystemRunnable( |
| "r/rj/start2", "Finish Initialization") { //$NON-NLS-1$ |
| |
| @Override |
| public void run(final ToolService service, |
| final ProgressMonitor m) throws StatusException { |
| for (final Status status : warnings) { |
| handleStatus(status, m); |
| } |
| } |
| |
| }); |
| } |
| catch (final IOException e) { |
| if (processInput != null) { |
| try { |
| processInput.close(); |
| } catch (final IOException e1) {} |
| } |
| throw new StatusException(new ErrorStatus(RConsoleCorePlugin.BUNDLE_ID, |
| RNicoMessages.RTerm_error_Starting_message, |
| e )); |
| } |
| } |
| |
| @Nullable Process getProcess() { |
| return this.process; |
| } |
| |
| @Override |
| protected void interruptTool() { |
| runSendCtrlC(); |
| } |
| |
| @Override |
| protected void postCancelTask(final int options, |
| final ProgressMonitor m) throws StatusException { |
| this.fCurrentInput= ""; //$NON-NLS-1$ |
| doSubmitL(m); |
| this.fCurrentInput= ""; //$NON-NLS-1$ |
| doSubmitL(m); |
| } |
| |
| @Override |
| protected void killTool(final ProgressMonitor m) { |
| final Process process= this.process; |
| if (process != null) { |
| process.destroy(); |
| this.process= null; |
| } |
| } |
| |
| @Override |
| protected boolean isToolAlive() { |
| final Process process= this.process; |
| if (process != null) { |
| try { |
| process.exitValue(); |
| } |
| catch (final IllegalThreadStateException e) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| protected void clear() { |
| this.process= null; |
| super.clear(); |
| } |
| |
| |
| private boolean runSendCtrlC() { |
| if (!Platform.getOS().startsWith("win") //$NON-NLS-1$ |
| || getStatus() == ToolStatus.TERMINATED) { |
| return false; |
| } |
| |
| final ToolCommandHandler handler= getCommandHandler(RUN_BLOCKING_EVENT_ID); |
| if (handler != null) { |
| final RTermCancelRunnable cancelRunnable= new RTermCancelRunnable(); |
| final ToolCommandData data= new BasicToolCommandData(Map.of( |
| RUN_RUNNABLE_DATA_KEY, cancelRunnable )); |
| final Status status= executeHandler(RUN_BLOCKING_EVENT_ID, handler, data, null); |
| return (status != null && status.getSeverity() == Status.OK); |
| } |
| return false; |
| } |
| |
| |
| //-- RunnabelAdapter |
| |
| @Override |
| protected void doBeforeSubmitL() { |
| final ToolStreamProxy streams= getStreams(); |
| final SubmitType submitType= getCurrentSubmitType(); |
| // adds control stream |
| // without prompt |
| try { |
| this.processOutputThread.streamLock.lock(); |
| streams.getInputStreamMonitor().append(this.fCurrentInput, submitType, |
| (this.fCurrentPrompt.meta & ConsoleService.META_HISTORY_DONTADD) ); |
| streams.getInputStreamMonitor().append(getWorkspaceData().getLineSeparator(), submitType, |
| ConsoleService.META_HISTORY_DONTADD); |
| } |
| finally { |
| this.processOutputThread.streamLock.unlock(); |
| } |
| } |
| |
| @Override |
| protected void doSubmitL(final ProgressMonitor m) { |
| m.beginSubTask(this.fDefaultPrompt.text + " " + this.fCurrentInput); //$NON-NLS-1$ |
| |
| try { |
| this.processInputWriter.write(this.fCurrentInput + this.fLineSeparator); |
| this.processInputWriter.flush(); |
| } |
| catch (final IOException e) { |
| RConsoleCorePlugin.logError("Rterm IO error", e); //$NON-NLS-1$ |
| if (!isToolAlive()) { |
| markAsTerminated(); |
| setCurrentPromptL(Prompt.NONE); |
| return; |
| } |
| } |
| |
| try { |
| Thread.sleep(this.processOutputThread.SYNC_MS*2); |
| } |
| catch (final InterruptedException e) { |
| // continue directly |
| } |
| this.processOutputThread.streamLock.lock(); |
| this.processOutputThread.streamLock.unlock(); |
| |
| setCurrentPromptL(this.fDefaultPrompt); |
| } |
| |
| @Override |
| public Pattern synch(final ProgressMonitor m) throws StatusException { |
| final ToolStreamMonitor stream= getStreams().getOutputStreamMonitor(); |
| final String stamp= "Synch"+System.nanoTime(); //$NON-NLS-1$ |
| final AtomicBoolean patternFound= new AtomicBoolean(false); |
| final IStreamListener listener= new IStreamListener() { |
| |
| private String lastLine= ""; //$NON-NLS-1$ |
| |
| @Override |
| public void streamAppended(final String text, final IStreamMonitor monitor) { |
| if (text.contains(stamp)) { |
| found(); |
| return; |
| } |
| final String[] lines= RUtil.LINE_SEPARATOR_PATTERN.split(text, -1); |
| if ((this.lastLine + lines[0]).contains(stamp)) { |
| found(); |
| return; |
| } |
| this.lastLine= lines[lines.length-1]; |
| } |
| |
| private void found() { |
| stream.removeListener(this); |
| patternFound.set(true); |
| } |
| |
| }; |
| try { |
| stream.addListener(listener); |
| submitToConsole("cat(\"" + stamp + "\\n\");", m); //$NON-NLS-1$ //$NON-NLS-2$ |
| while (!patternFound.get()) { |
| if (m.isCanceled()) { |
| throw cancelTask(); |
| } |
| try { |
| Thread.sleep(50); |
| } |
| catch (final InterruptedException e) { |
| // continue directly |
| } |
| } |
| return Pattern.compile("(?:" + Pattern.quote(getWorkspaceData().getDefaultPrompt().text) + ")?" + stamp); //$NON-NLS-1$ //$NON-NLS-2$ |
| } |
| finally { |
| stream.removeListener(listener); |
| } |
| } |
| |
| private @Nullable StringBuilder readOutputLine(final String command, |
| final ProgressMonitor m) throws StatusException { |
| final ToolStreamMonitor stream= getStreams().getOutputStreamMonitor(); |
| final StringBuilder output= new StringBuilder(); |
| final AtomicBoolean patternFound= new AtomicBoolean(false); |
| final IStreamListener listener= new IStreamListener() { |
| |
| @Override |
| public void streamAppended(final String text, final IStreamMonitor monitor) { |
| final Matcher matcher= RUtil.LINE_SEPARATOR_PATTERN.matcher(text); |
| if (matcher.find()) { |
| output.append(text.substring(0, matcher.start())); |
| found(); |
| } |
| else { |
| output.append(text); |
| } |
| } |
| |
| private void found() { |
| stream.removeListener(this); |
| patternFound.set(true); |
| } |
| |
| }; |
| synch(m); |
| try { |
| stream.addListener(listener); |
| if (m.isCanceled()) { |
| return null; |
| } |
| submitToConsole(command, m); |
| while (!patternFound.get()) { |
| if (m.isCanceled()) { |
| throw cancelTask(); |
| } |
| try { |
| Thread.sleep(50); |
| } |
| catch (final InterruptedException e) { |
| // continue directly |
| } |
| } |
| return output; |
| } |
| finally { |
| stream.removeListener(listener); |
| } |
| } |
| |
| } |