blob: 6c161d929b1197cb851cf0fe862d225e77c716b0 [file] [log] [blame]
/*=============================================================================#
# Copyright (c) 2007, 2019 Stephan Wahlbrink and others.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
package org.eclipse.statet.r.nico.impl;
import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_BLOCKING_EVENT_ID;
import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_RUNNABLE_DATA_KEY;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.core.filesystem.EFS;
import org.eclipse.core.runtime.Path;
import org.eclipse.core.runtime.Platform;
import org.eclipse.debug.core.IStreamListener;
import org.eclipse.debug.core.model.IStreamMonitor;
import org.eclipse.statet.jcommons.status.ErrorStatus;
import org.eclipse.statet.jcommons.status.ProgressMonitor;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
import org.eclipse.statet.jcommons.ts.core.SystemRunnable;
import org.eclipse.statet.jcommons.ts.core.ToolCommandHandler;
import org.eclipse.statet.jcommons.ts.core.ToolRunnable;
import org.eclipse.statet.jcommons.ts.core.ToolService;
import org.eclipse.statet.internal.r.console.core.RConsoleCorePlugin;
import org.eclipse.statet.internal.r.nico.RNicoMessages;
import org.eclipse.statet.nico.core.runtime.ConsoleService;
import org.eclipse.statet.nico.core.runtime.IRequireSynch;
import org.eclipse.statet.nico.core.runtime.Prompt;
import org.eclipse.statet.nico.core.runtime.SubmitType;
import org.eclipse.statet.nico.core.runtime.ToolStatus;
import org.eclipse.statet.nico.core.runtime.ToolStreamMonitor;
import org.eclipse.statet.nico.core.runtime.ToolStreamProxy;
import org.eclipse.statet.r.console.core.AbstractRController;
import org.eclipse.statet.r.console.core.RProcess;
import org.eclipse.statet.r.console.core.RWorkspace;
import org.eclipse.statet.r.core.RUtil;
/**
* Controller for RTerm.
*/
public class RTermController extends AbstractRController implements IRequireSynch {
private static final Pattern INT_OUTPUT_PATTERN = Pattern.compile("\\Q[1] \\E(\\d*)"); //$NON-NLS-1$
private static final Pattern STRING_OUTPUT_PATTERN = Pattern.compile("\\Q[1] \"\\E((?:\\Q\\\"\\E|[^\"])*)\\\""); //$NON-NLS-1$
private class ReadThread extends Thread {
volatile int hasNoOutput;
private final int SYNC_COUNT = 2;
private final int SYNC_MS = 33;
final Lock streamLock = new ReentrantLock();
public ReadThread() {
super("Rterm-Output Monitor"); //$NON-NLS-1$
}
@Override
public void run() {
final ToolStreamProxy streams = getStreams();
boolean locked = false;
try {
boolean canRead = false;
final char[] b = new char[1024];
while (fProcess != null | (canRead = fProcessOutputReader.ready())) {
fProcessOutputBuffer.available();
if (canRead || this.hasNoOutput > this.SYNC_COUNT) {
if (!canRead && locked) {
this.streamLock.unlock();
locked = false;
}
int n = fProcessOutputReader.read(b);
if (n > 0) {
this.hasNoOutput = 0;
if (!locked) {
this.streamLock.lock();
locked = true;
}
final String s = new String(b, 0, n);
streams.getOutputStreamMonitor().append(s, SubmitType.CONSOLE, 0);
n = s.length();
if (n >= 2 && s.charAt(--n) == ' ' && (s.charAt(--n) == '>' || s.charAt(n) == '+')) {
this.hasNoOutput++;
final Thread thread = getControllerThread();
if (thread != null) {
thread.interrupt();
}
}
continue;
}
else if (n < 0) {
onRTerminated();
return;
}
}
try {
Thread.sleep(this.SYNC_MS);
this.hasNoOutput++;
}
catch (final InterruptedException e) {
// continue directly
}
}
}
catch (final IOException e) {
onRTerminated();
return;
}
finally {
if (locked) {
this.streamLock.unlock();
locked = false;
}
try {
fProcessOutputReader.close();
} catch (final IOException e1) {
}
}
}
private void onRTerminated() {
markAsTerminated();
synchronized (getQueue()) {
getQueue().notifyAll();
}
}
}
private class UpdateProcessIdTask extends ControllerSystemRunnable implements SystemRunnable {
public UpdateProcessIdTask() {
super("r/rterm/fetch-process-id", "Fetch Process Id"); //$NON-NLS-1$
}
@Override
public void run(final ToolService service, final ProgressMonitor m) throws StatusException {
final StringBuilder output = readOutputLine("Sys.getpid()", m); //$NON-NLS-1$
if (output != null) {
final Matcher matcher = INT_OUTPUT_PATTERN.matcher(output);
if (matcher.find()) {
final String idString = matcher.group(1);
if (idString != null) {
try {
fProcessId = Long.valueOf(idString);
}
catch (final NumberFormatException e) {
fProcessId = null;
}
}
else {
fProcessId = null;
}
}
}
}
}
private final ProcessBuilder fConfig;
private final Charset fCharset;
private Process fProcess;
private OutputStreamWriter fProcessInputWriter;
private BufferedInputStream fProcessOutputBuffer;
private InputStreamReader fProcessOutputReader;
private ReadThread fProcessOutputThread;
Long fProcessId;
public RTermController(final RProcess process, final ProcessBuilder config, final Charset charset) {
super(process, null);
fConfig = config;
fCharset = charset;
setWorksapceData(new RWorkspace(this, null, null) {
@Override
protected void refreshFromTool(final AbstractRController controller, final int options,
final ProgressMonitor m) throws StatusException {
if ((options & RWorkspace.REFRESH_COMPLETE) != 0 || (options & RWorkspace.REFRESH_AUTO) == 0) {
final StringBuilder output = readOutputLine("getwd()", m); //$NON-NLS-1$
if (output != null) {
final Matcher matcher = STRING_OUTPUT_PATTERN.matcher(output);
if (matcher.find()) {
final String wd = matcher.group(1);
setWorkspaceDirL(EFS.getLocalFileSystem().getStore(new Path(wd)));
}
}
}
clearBriefedChanges();
}
});
setWorkspaceDirL(EFS.getLocalFileSystem().fromLocalFile(config.directory()));
initRunnableAdapterL();
}
@Override
protected ToolRunnable createStartRunnable() {
return new StartRunnable() {
@Override
public String getLabel() {
return RNicoMessages.Rterm_StartTask_label;
}
};
}
@Override
protected void startToolL(final ProgressMonitor m) throws StatusException {
OutputStream processInput = null;
InputStream processOutput;
try {
fConfig.redirectErrorStream(true);
fProcess = fConfig.start();
processOutput = fProcess.getInputStream();
if (processOutput instanceof BufferedInputStream) {
fProcessOutputBuffer = (BufferedInputStream) processOutput;
}
fProcessOutputReader = new InputStreamReader(processOutput, fCharset);
fProcessOutputThread = new ReadThread();
fProcessOutputThread.start();
processInput = fProcess.getOutputStream();
fProcessInputWriter = new OutputStreamWriter(processInput, fCharset);
setCurrentPromptL(fDefaultPrompt);
final List<Status> warnings= new ArrayList<>();
initTracks(fConfig.directory().toString(), m, warnings);
getQueue().add(new UpdateProcessIdTask());
if (!this.startupsRunnables.isEmpty()) {
getQueue().add(this.startupsRunnables);
this.startupsRunnables.clear();
}
scheduleControllerRunnable(new ControllerSystemRunnable(
"r/rj/start2", "Finish Initialization") { //$NON-NLS-1$
@Override
public void run(final ToolService service,
final ProgressMonitor m) throws StatusException {
for (final Status status : warnings) {
handleStatus(status, m);
}
}
});
}
catch (final IOException e) {
if (processInput != null) {
try {
processInput.close();
} catch (final IOException e1) {
}
}
throw new StatusException(new ErrorStatus(RConsoleCorePlugin.BUNDLE_ID,
RNicoMessages.RTerm_error_Starting_message,
e ));
}
}
@Override
protected void interruptTool() {
runSendCtrlC();
}
@Override
protected void postCancelTask(final int options,
final ProgressMonitor m) throws StatusException {
fCurrentInput = ""; //$NON-NLS-1$
doSubmitL(m);
fCurrentInput = ""; //$NON-NLS-1$
doSubmitL(m);
}
@Override
protected void killTool(final ProgressMonitor m) {
final Process p = fProcess;
if (p != null) {
p.destroy();
fProcess = null;
}
}
@Override
protected boolean isToolAlive() {
final Process p = fProcess;
if (p != null) {
try {
p.exitValue();
}
catch (final IllegalThreadStateException e) {
return true;
}
}
return false;
}
@Override
protected void clear() {
fProcess = null;
super.clear();
}
private boolean runSendCtrlC() {
if (!Platform.getOS().startsWith("win") //$NON-NLS-1$
|| getStatus() == ToolStatus.TERMINATED) {
return false;
}
final ToolCommandHandler handler = getCommandHandler(RUN_BLOCKING_EVENT_ID);
if (handler != null) {
final RTermCancelRunnable cancelRunnable = new RTermCancelRunnable();
final Map<String, Object> data = Collections.singletonMap(RUN_RUNNABLE_DATA_KEY, (Object) cancelRunnable);
final Status status = executeHandler(RUN_BLOCKING_EVENT_ID, handler, data, null);
return (status != null && status.getSeverity() == Status.OK);
}
return false;
}
//-- RunnabelAdapter
@Override
protected void doBeforeSubmitL() {
final ToolStreamProxy streams = getStreams();
final SubmitType submitType = getCurrentSubmitType();
// adds control stream
// without prompt
try {
fProcessOutputThread.streamLock.lock();
streams.getInputStreamMonitor().append(fCurrentInput, submitType,
(fCurrentPrompt.meta & ConsoleService.META_HISTORY_DONTADD) );
streams.getInputStreamMonitor().append(getWorkspaceData().getLineSeparator(), submitType,
ConsoleService.META_HISTORY_DONTADD);
}
finally {
fProcessOutputThread.streamLock.unlock();
}
}
@Override
protected void doSubmitL(final ProgressMonitor m) {
m.beginSubTask(fDefaultPrompt.text + " " + fCurrentInput); //$NON-NLS-1$
try {
fProcessInputWriter.write(fCurrentInput + fLineSeparator);
fProcessInputWriter.flush();
}
catch (final IOException e) {
RConsoleCorePlugin.logError("Rterm IO error", e); //$NON-NLS-1$
if (!isToolAlive()) {
markAsTerminated();
setCurrentPromptL(Prompt.NONE);
return;
}
}
try {
Thread.sleep(fProcessOutputThread.SYNC_MS*2);
}
catch (final InterruptedException e) {
// continue directly
}
fProcessOutputThread.streamLock.lock();
fProcessOutputThread.streamLock.unlock();
setCurrentPromptL(fDefaultPrompt);
}
@Override
public Pattern synch(final ProgressMonitor m) throws StatusException {
final ToolStreamMonitor stream = getStreams().getOutputStreamMonitor();
final String stamp = "Synch"+System.nanoTime(); //$NON-NLS-1$
final AtomicBoolean patternFound = new AtomicBoolean(false);
final IStreamListener listener = new IStreamListener() {
private String lastLine = ""; //$NON-NLS-1$
@Override
public void streamAppended(final String text, final IStreamMonitor monitor) {
if (text.contains(stamp)) {
found();
return;
}
final String[] lines = RUtil.LINE_SEPARATOR_PATTERN.split(text, -1);
if ((this.lastLine + lines[0]).contains(stamp)) {
found();
return;
}
this.lastLine = lines[lines.length-1];
}
private void found() {
stream.removeListener(this);
patternFound.set(true);
}
};
try {
stream.addListener(listener);
submitToConsole("cat(\""+stamp+"\\n\");", m); //$NON-NLS-1$ //$NON-NLS-2$
while (!patternFound.get()) {
if (m.isCanceled()) {
throw cancelTask();
}
try {
Thread.sleep(50);
}
catch (final InterruptedException e) {
// continue directly
}
}
return Pattern.compile("(?:"+Pattern.quote(getWorkspaceData().getDefaultPrompt().text) + ")?"+stamp); //$NON-NLS-1$ //$NON-NLS-2$
}
finally {
stream.removeListener(listener);
}
}
private StringBuilder readOutputLine(final String command,
final ProgressMonitor m) throws StatusException {
final ToolStreamMonitor stream = getStreams().getOutputStreamMonitor();
final StringBuilder output = new StringBuilder();
final AtomicBoolean patternFound = new AtomicBoolean(false);
final IStreamListener listener = new IStreamListener() {
@Override
public void streamAppended(final String text, final IStreamMonitor monitor) {
final Matcher matcher = RUtil.LINE_SEPARATOR_PATTERN.matcher(text);
if (matcher.find()) {
output.append(text.substring(0, matcher.start()));
found();
}
else {
output.append(text);
}
}
private void found() {
stream.removeListener(this);
patternFound.set(true);
}
};
synch(m);
try {
stream.addListener(listener);
if (m.isCanceled()) {
return null;
}
submitToConsole(command, m);
while (!patternFound.get()) {
if (m.isCanceled()) {
throw cancelTask();
}
try {
Thread.sleep(50);
}
catch (final InterruptedException e) {
// continue directly
}
}
return output;
}
finally {
stream.removeListener(listener);
}
}
}