blob: f0d6111db7bd4a5cd12f4fc9df8b25321ba4cde4 [file] [log] [blame]
/*=============================================================================#
# Copyright (c) 2007, 2021 Stephan Wahlbrink and others.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# Stephan Wahlbrink <sw@wahlbrink.eu> - initial API and implementation
#=============================================================================*/
package org.eclipse.statet.r.nico.impl;
import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_BLOCKING_EVENT_ID;
import static org.eclipse.statet.nico.core.runtime.IToolEventHandler.RUN_RUNNABLE_DATA_KEY;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.core.filesystem.EFS;
import org.eclipse.core.runtime.Path;
import org.eclipse.core.runtime.Platform;
import org.eclipse.debug.core.IStreamListener;
import org.eclipse.debug.core.model.IStreamMonitor;
import org.eclipse.statet.jcommons.lang.NonNullByDefault;
import org.eclipse.statet.jcommons.lang.Nullable;
import org.eclipse.statet.jcommons.status.ErrorStatus;
import org.eclipse.statet.jcommons.status.ProgressMonitor;
import org.eclipse.statet.jcommons.status.Status;
import org.eclipse.statet.jcommons.status.StatusException;
import org.eclipse.statet.jcommons.ts.core.BasicToolCommandData;
import org.eclipse.statet.jcommons.ts.core.ToolCommandData;
import org.eclipse.statet.jcommons.ts.core.ToolCommandHandler;
import org.eclipse.statet.jcommons.ts.core.ToolRunnable;
import org.eclipse.statet.jcommons.ts.core.ToolService;
import org.eclipse.statet.internal.r.console.core.RConsoleCorePlugin;
import org.eclipse.statet.internal.r.nico.RNicoMessages;
import org.eclipse.statet.nico.core.runtime.ConsoleService;
import org.eclipse.statet.nico.core.runtime.IRequireSynch;
import org.eclipse.statet.nico.core.runtime.Prompt;
import org.eclipse.statet.nico.core.runtime.SubmitType;
import org.eclipse.statet.nico.core.runtime.ToolStatus;
import org.eclipse.statet.nico.core.runtime.ToolStreamMonitor;
import org.eclipse.statet.nico.core.runtime.ToolStreamProxy;
import org.eclipse.statet.r.console.core.AbstractRController;
import org.eclipse.statet.r.console.core.RProcess;
import org.eclipse.statet.r.console.core.RWorkspace;
import org.eclipse.statet.r.core.RUtil;
/**
* Controller for RTerm.
*/
@NonNullByDefault
public class RTermController extends AbstractRController implements IRequireSynch {
private static final Pattern INT_OUTPUT_PATTERN= Pattern.compile("\\Q[1] \\E(\\d*)"); //$NON-NLS-1$
private static final Pattern STRING_OUTPUT_PATTERN= Pattern.compile("\\Q[1] \"\\E((?:\\Q\\\"\\E|[^\"])*)\\\""); //$NON-NLS-1$
private class ReadThread extends Thread {
private final InputStreamReader reader;
volatile int hasNoOutput;
private final int SYNC_COUNT= 2;
private final int SYNC_MS= 33;
final Lock streamLock= new ReentrantLock();
public ReadThread(final InputStream output) {
super("Rterm-Output Monitor"); //$NON-NLS-1$
this.reader= new InputStreamReader(output, RTermController.this.charset);
}
@Override
public void run() {
final ToolStreamProxy streams= getStreams();
boolean locked= false;
try {
boolean canRead= false;
final char[] b= new char[1024];
while (RTermController.this.process != null | (canRead= this.reader.ready())) {
if (canRead || this.hasNoOutput > this.SYNC_COUNT) {
if (!canRead && locked) {
this.streamLock.unlock();
locked= false;
}
int n= this.reader.read(b);
if (n > 0) {
this.hasNoOutput= 0;
if (!locked) {
this.streamLock.lock();
locked= true;
}
final String s= new String(b, 0, n);
streams.getOutputStreamMonitor().append(s, SubmitType.CONSOLE, 0);
n= s.length();
if (n >= 2 && s.charAt(--n) == ' ' && (s.charAt(--n) == '>' || s.charAt(n) == '+')) {
this.hasNoOutput++;
final Thread thread= getControllerThread();
if (thread != null) {
thread.interrupt();
}
}
continue;
}
else if (n < 0) {
onRTerminated();
return;
}
}
try {
Thread.sleep(this.SYNC_MS);
this.hasNoOutput++;
}
catch (final InterruptedException e) {
// continue directly
}
}
}
catch (final IOException e) {
onRTerminated();
return;
}
finally {
if (locked) {
this.streamLock.unlock();
locked= false;
}
try {
this.reader.close();
} catch (final IOException e1) {
}
}
}
private void onRTerminated() {
markAsTerminated();
synchronized (getQueue()) {
getQueue().notifyAll();
}
}
}
private final ProcessBuilder processConfig;
private final Charset charset;
private @Nullable Process process;
private OutputStreamWriter processInputWriter;
private ReadThread processOutputThread;
public RTermController(final RProcess process, final ProcessBuilder config, final Charset charset) {
super(process, null);
this.processConfig= config;
this.charset= charset;
setWorksapceData(new RWorkspace(this, null, null) {
@Override
protected void refreshFromTool(final AbstractRController controller, final int options,
final ProgressMonitor m) throws StatusException {
if ((options & RWorkspace.REFRESH_COMPLETE) != 0 || (options & RWorkspace.REFRESH_AUTO) == 0) {
final StringBuilder output= readOutputLine("getwd()", m); //$NON-NLS-1$
if (output != null) {
final Matcher matcher= STRING_OUTPUT_PATTERN.matcher(output);
if (matcher.find()) {
final String wd= matcher.group(1);
setWorkspaceDirL(EFS.getLocalFileSystem().getStore(new Path(wd)));
}
}
}
clearBriefedChanges();
}
});
setWorkspaceDirL(EFS.getLocalFileSystem().fromLocalFile(config.directory()));
initRunnableAdapterL();
}
@Override
protected ToolRunnable createStartRunnable() {
return new StartRunnable() {
@Override
public String getLabel() {
return RNicoMessages.Rterm_StartTask_label;
}
};
}
@Override
protected void startToolL(final ProgressMonitor m) throws StatusException {
OutputStream processInput= null;
try {
final List<Status> warnings= new ArrayList<>();
this.processConfig.redirectErrorStream(true);
final Process process= this.processConfig.start();
this.process= process;
this.processOutputThread= new ReadThread(process.getInputStream());
this.processOutputThread.start();
processInput= process.getOutputStream();
this.processInputWriter= new OutputStreamWriter(processInput, this.charset);
setCurrentPromptL(this.fDefaultPrompt);
initTracks(m, warnings);
if (!this.startupsRunnables.isEmpty()) {
getQueue().add(this.startupsRunnables);
this.startupsRunnables.clear();
}
scheduleControllerRunnable(new ControllerSystemRunnable(
"r/rj/start2", "Finish Initialization") { //$NON-NLS-1$
@Override
public void run(final ToolService service,
final ProgressMonitor m) throws StatusException {
for (final Status status : warnings) {
handleStatus(status, m);
}
}
});
}
catch (final IOException e) {
if (processInput != null) {
try {
processInput.close();
} catch (final IOException e1) {}
}
throw new StatusException(new ErrorStatus(RConsoleCorePlugin.BUNDLE_ID,
RNicoMessages.RTerm_error_Starting_message,
e ));
}
}
@Nullable Process getProcess() {
return this.process;
}
@Override
protected void interruptTool() {
runSendCtrlC();
}
@Override
protected void postCancelTask(final int options,
final ProgressMonitor m) throws StatusException {
this.fCurrentInput= ""; //$NON-NLS-1$
doSubmitL(m);
this.fCurrentInput= ""; //$NON-NLS-1$
doSubmitL(m);
}
@Override
protected void killTool(final ProgressMonitor m) {
final Process process= this.process;
if (process != null) {
process.destroy();
this.process= null;
}
}
@Override
protected boolean isToolAlive() {
final Process process= this.process;
if (process != null) {
try {
process.exitValue();
}
catch (final IllegalThreadStateException e) {
return true;
}
}
return false;
}
@Override
protected void clear() {
this.process= null;
super.clear();
}
private boolean runSendCtrlC() {
if (!Platform.getOS().startsWith("win") //$NON-NLS-1$
|| getStatus() == ToolStatus.TERMINATED) {
return false;
}
final ToolCommandHandler handler= getCommandHandler(RUN_BLOCKING_EVENT_ID);
if (handler != null) {
final RTermCancelRunnable cancelRunnable= new RTermCancelRunnable();
final ToolCommandData data= new BasicToolCommandData(Map.of(
RUN_RUNNABLE_DATA_KEY, cancelRunnable ));
final Status status= executeHandler(RUN_BLOCKING_EVENT_ID, handler, data, null);
return (status != null && status.getSeverity() == Status.OK);
}
return false;
}
//-- RunnabelAdapter
@Override
protected void doBeforeSubmitL() {
final ToolStreamProxy streams= getStreams();
final SubmitType submitType= getCurrentSubmitType();
// adds control stream
// without prompt
try {
this.processOutputThread.streamLock.lock();
streams.getInputStreamMonitor().append(this.fCurrentInput, submitType,
(this.fCurrentPrompt.meta & ConsoleService.META_HISTORY_DONTADD) );
streams.getInputStreamMonitor().append(getWorkspaceData().getLineSeparator(), submitType,
ConsoleService.META_HISTORY_DONTADD);
}
finally {
this.processOutputThread.streamLock.unlock();
}
}
@Override
protected void doSubmitL(final ProgressMonitor m) {
m.beginSubTask(this.fDefaultPrompt.text + " " + this.fCurrentInput); //$NON-NLS-1$
try {
this.processInputWriter.write(this.fCurrentInput + this.fLineSeparator);
this.processInputWriter.flush();
}
catch (final IOException e) {
RConsoleCorePlugin.logError("Rterm IO error", e); //$NON-NLS-1$
if (!isToolAlive()) {
markAsTerminated();
setCurrentPromptL(Prompt.NONE);
return;
}
}
try {
Thread.sleep(this.processOutputThread.SYNC_MS*2);
}
catch (final InterruptedException e) {
// continue directly
}
this.processOutputThread.streamLock.lock();
this.processOutputThread.streamLock.unlock();
setCurrentPromptL(this.fDefaultPrompt);
}
@Override
public Pattern synch(final ProgressMonitor m) throws StatusException {
final ToolStreamMonitor stream= getStreams().getOutputStreamMonitor();
final String stamp= "Synch"+System.nanoTime(); //$NON-NLS-1$
final AtomicBoolean patternFound= new AtomicBoolean(false);
final IStreamListener listener= new IStreamListener() {
private String lastLine= ""; //$NON-NLS-1$
@Override
public void streamAppended(final String text, final IStreamMonitor monitor) {
if (text.contains(stamp)) {
found();
return;
}
final String[] lines= RUtil.LINE_SEPARATOR_PATTERN.split(text, -1);
if ((this.lastLine + lines[0]).contains(stamp)) {
found();
return;
}
this.lastLine= lines[lines.length-1];
}
private void found() {
stream.removeListener(this);
patternFound.set(true);
}
};
try {
stream.addListener(listener);
submitToConsole("cat(\"" + stamp + "\\n\");", m); //$NON-NLS-1$ //$NON-NLS-2$
while (!patternFound.get()) {
if (m.isCanceled()) {
throw cancelTask();
}
try {
Thread.sleep(50);
}
catch (final InterruptedException e) {
// continue directly
}
}
return Pattern.compile("(?:" + Pattern.quote(getWorkspaceData().getDefaultPrompt().text) + ")?" + stamp); //$NON-NLS-1$ //$NON-NLS-2$
}
finally {
stream.removeListener(listener);
}
}
private @Nullable StringBuilder readOutputLine(final String command,
final ProgressMonitor m) throws StatusException {
final ToolStreamMonitor stream= getStreams().getOutputStreamMonitor();
final StringBuilder output= new StringBuilder();
final AtomicBoolean patternFound= new AtomicBoolean(false);
final IStreamListener listener= new IStreamListener() {
@Override
public void streamAppended(final String text, final IStreamMonitor monitor) {
final Matcher matcher= RUtil.LINE_SEPARATOR_PATTERN.matcher(text);
if (matcher.find()) {
output.append(text.substring(0, matcher.start()));
found();
}
else {
output.append(text);
}
}
private void found() {
stream.removeListener(this);
patternFound.set(true);
}
};
synch(m);
try {
stream.addListener(listener);
if (m.isCanceled()) {
return null;
}
submitToConsole(command, m);
while (!patternFound.get()) {
if (m.isCanceled()) {
throw cancelTask();
}
try {
Thread.sleep(50);
}
catch (final InterruptedException e) {
// continue directly
}
}
return output;
}
finally {
stream.removeListener(listener);
}
}
}