| /******************************************************************************* |
| * Copyright (c) 2011, 2014 Wind River Systems, Inc. and others. All rights reserved. |
| * This program and the accompanying materials are made available under the terms |
| * of the Eclipse Public License v1.0 which accompanies this distribution, and is |
| * available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Wind River Systems - initial API and implementation |
| *******************************************************************************/ |
| package org.eclipse.tcf.te.tcf.processes.core.launcher; |
| |
| import java.io.IOException; |
| import java.io.Writer; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.CancellationException; |
| |
| import org.eclipse.core.runtime.Assert; |
| import org.eclipse.core.runtime.IStatus; |
| import org.eclipse.core.runtime.Platform; |
| import org.eclipse.core.runtime.Status; |
| import org.eclipse.osgi.util.NLS; |
| import org.eclipse.tcf.protocol.IChannel; |
| import org.eclipse.tcf.protocol.IToken; |
| import org.eclipse.tcf.protocol.Protocol; |
| import org.eclipse.tcf.services.IProcesses; |
| import org.eclipse.tcf.services.IProcessesV1; |
| import org.eclipse.tcf.services.IStreams; |
| import org.eclipse.tcf.te.runtime.callback.AsyncCallbackCollector; |
| import org.eclipse.tcf.te.runtime.callback.Callback; |
| import org.eclipse.tcf.te.runtime.interfaces.IDisposable; |
| import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback; |
| import org.eclipse.tcf.te.tcf.core.Tcf; |
| import org.eclipse.tcf.te.tcf.core.async.CallbackInvocationDelegate; |
| import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; |
| import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListenerProxy; |
| import org.eclipse.tcf.te.tcf.core.streams.StreamsDataProvider; |
| import org.eclipse.tcf.te.tcf.core.streams.StreamsDataReceiver; |
| import org.eclipse.tcf.te.tcf.core.util.ExceptionUtils; |
| import org.eclipse.tcf.te.tcf.processes.core.activator.CoreBundleActivator; |
| import org.eclipse.tcf.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener; |
| import org.eclipse.tcf.te.tcf.processes.core.interfaces.tracing.ITraceIds; |
| import org.eclipse.tcf.te.tcf.processes.core.nls.Messages; |
| import org.eclipse.tcf.util.TCFTask; |
| |
| /** |
| * Remote process streams listener implementation. |
| */ |
| public class ProcessStreamsListener implements IChannelManager.IStreamsListener, IProcessContextAwareListener, IDisposable { |
| // The channel instance |
| /* default */ IChannel channel; |
| // The streams service instance |
| /* default */ IStreams svcStreams; |
| // The processes service name |
| /* default */ String svcProcessesName; |
| // The remote process context |
| private IProcesses.ProcessContext context; |
| // The list of registered stream data receivers |
| private final List<StreamsDataReceiver> dataReceiver = new ArrayList<StreamsDataReceiver>(); |
| // The stream data provider |
| private StreamsDataProvider dataProvider; |
| // The list of created runnable's |
| private final List<Runnable> runnables = new ArrayList<Runnable>(); |
| // The streams listener proxy instance |
| private IChannelManager.IStreamsListenerProxy proxy = null; |
| // The list of already processed streams created events (simple string in format "<stream type>;<stream id>;<context id>") |
| /* default */ List<String> processedCreatedEvents = new ArrayList<String>(); |
| |
| /** |
| * Remote process stream reader runnable implementation. The |
| * runnable will be executed within a thread and is responsible to read the |
| * incoming data from the associated stream and forward them to the registered receivers. |
| */ |
| protected class StreamReaderRunnable implements Runnable { |
| // The associated stream id |
| /* default */ final String streamId; |
| // The associated stream type id |
| private final String streamTypeId; |
| // The list of receivers applicable for the associated stream type id |
| private final List<StreamsDataReceiver> receivers = new ArrayList<StreamsDataReceiver>(); |
| // The currently active read task |
| private TCFTask<ReadData> activeTask; |
| // The callback to invoke if the runnable stopped |
| private ICallback callback; |
| |
| // Flag to stop the runnable |
| private boolean stopped = false; |
| |
| /** |
| * Immutable class describing the result returned by {@link StreamReaderRunnable#read(IStreams, String, int)}. |
| */ |
| protected class ReadData { |
| /** |
| * The number of lost bytes in case of a buffer overflow. If <code>-1</code>, |
| * an unknown number of bytes were lost. If non-zero and <code>data.length</code> is |
| * non-zero, the lost bytes are considered located right before the read bytes. |
| */ |
| public final int lostBytes; |
| /** |
| * The read data as byte array. |
| */ |
| public final byte[] data; |
| /** |
| * Flag to signal if the end of the stream has been reached. |
| */ |
| public final boolean eos; |
| |
| /** |
| * Constructor. |
| */ |
| public ReadData(int lostBytes, byte[] data, boolean eos) { |
| this.lostBytes = lostBytes; |
| this.data = data; |
| this.eos = eos; |
| } |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param streamId The associated stream id. Must not be <code>null</code>. |
| * @param streamTypeId The associated stream type id. Must not be <code>null</code>. |
| * @param receivers The list of registered data receivers. Must not be <code>null</code>. |
| */ |
| public StreamReaderRunnable(String streamId, String streamTypeId, StreamsDataReceiver[] receivers) { |
| Assert.isNotNull(streamId); |
| Assert.isNotNull(streamTypeId); |
| Assert.isNotNull(receivers); |
| |
| this.streamId = streamId; |
| this.streamTypeId = streamTypeId; |
| |
| // Loop the list of receivers and filter out the applicable ones |
| for (StreamsDataReceiver receiver : receivers) { |
| if (receiver.isApplicable(this.streamTypeId)) |
| this.receivers.add(receiver); |
| } |
| } |
| |
| /** |
| * Returns the associated stream id. |
| * |
| * @return The associated stream id. |
| */ |
| public final String getStreamId() { |
| return streamId; |
| } |
| |
| /** |
| * Returns if or if not the list of applicable receivers is empty. |
| * |
| * @return <code>True</code> if the list of applicable receivers is empty, <code>false</code> otherwise. |
| */ |
| public final boolean isEmpty() { |
| return receivers.isEmpty(); |
| } |
| |
| /** |
| * Stop the runnable. |
| * |
| * @param callback The callback to invoke if the runnable stopped. |
| */ |
| public final synchronized void stop(ICallback callback) { |
| // If the runnable is stopped already, invoke the callback directly |
| if (stopped) { |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| return; |
| } |
| |
| // Store the callback instance |
| this.callback = callback; |
| // Mark the runnable as stopped |
| stopped = true; |
| } |
| |
| /** |
| * Returns if the runnable should stop. |
| */ |
| protected final synchronized boolean isStopped() { |
| return stopped; |
| } |
| |
| /** |
| * Sets the currently active reader task. |
| * |
| * @param task The currently active reader task or <code>null</code>. |
| */ |
| protected final void setActiveTask(TCFTask<ReadData> task) { |
| activeTask = task; |
| } |
| |
| /** |
| * Returns the currently active reader task. |
| * |
| * @return The currently active reader task or <code>null</code>. |
| */ |
| protected final TCFTask<ReadData> getActiveTask() { |
| return activeTask; |
| } |
| |
| /* (non-Javadoc) |
| * @see java.lang.Runnable#run() |
| */ |
| @Override |
| public void run() { |
| // Create a snapshot of the receivers |
| final StreamsDataReceiver[] receivers = this.receivers.toArray(new StreamsDataReceiver[this.receivers.size()]); |
| |
| // Run until stopped and the streams service is available |
| while (!isStopped() && svcStreams != null) { |
| try { |
| ReadData streamData = read(svcStreams, streamId, 1024); |
| if (streamData != null) { |
| // Check if the received data contains some stream data |
| if (streamData.data != null) { |
| // Notify the data receivers about the new received data |
| notifyReceiver(new String(streamData.data), receivers); |
| } |
| // If the end of the stream have been reached --> break out |
| if (streamData.eos) { |
| break; |
| } |
| } |
| } catch (Exception e) { |
| // An error occurred -> Dump to the error log |
| e = ExceptionUtils.checkAndUnwrapException(e); |
| // Check if the blocking read task got canceled |
| if (!isStopped() && !(e instanceof CancellationException)) { |
| // Log the error to the user, might be something serious |
| IStatus status = new Status(IStatus.ERROR, CoreBundleActivator.getUniqueIdentifier(), |
| NLS.bind(Messages.ProcessStreamReaderRunnable_error_readFailed, streamId, e.getLocalizedMessage()), |
| e); |
| Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status); |
| } |
| // break out of the loop |
| break; |
| } |
| } |
| |
| // Disconnect from the stream |
| if (svcStreams != null) { |
| Runnable runnable = new Runnable() { |
| @Override |
| public void run() { |
| svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() { |
| @Override |
| @SuppressWarnings("synthetic-access") |
| public void doneDisconnect(IToken token, Exception error) { |
| synchronized (this) { |
| // Mark the runnable definitely stopped |
| stopped = true; |
| // Disconnect is done, ignore any error, invoke the callback |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| } |
| } |
| }); |
| } |
| }; |
| |
| Protocol.invokeLater(runnable); |
| } else { |
| synchronized (this) { |
| // Mark the runnable definitely stopped |
| stopped = true; |
| // Invoke the callback directly, if any |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| } |
| } |
| // Make sure that data receivers are disposed before leaving the thread. |
| // This will closes the PipedOutputStream wrapped by the writer in the data receivers, |
| // thus the PipedInputStream read an EOS and terminate gracefully. |
| for(StreamsDataReceiver receiver:receivers) { |
| receiver.dispose(); |
| } |
| } |
| |
| /** |
| * Reads data from the stream and blocks until some data has been received. |
| * |
| * @param service The streams service. Must not be <code>null</code>. |
| * @param streamId The stream id. Must not be <code>null</code>. |
| * @param size The size of the data to read. |
| * |
| * @return The read data. |
| * |
| * @throws Exception In case the read fails. |
| */ |
| protected final ReadData read(final IStreams service, final String streamId, final int size) throws Exception { |
| Assert.isNotNull(service); |
| Assert.isNotNull(streamId); |
| Assert.isTrue(!Protocol.isDispatchThread()); |
| |
| // Create the task object |
| TCFTask<ReadData> task = new TCFTask<ReadData>(channel) { |
| @Override |
| public void run() { |
| service.read(streamId, size, new IStreams.DoneRead() { |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.services.IStreams.DoneRead#doneRead(org.eclipse.tcf.protocol.IToken, java.lang.Exception, int, byte[], boolean) |
| */ |
| @Override |
| public void doneRead(IToken token, Exception error, int lostSize, byte[] data, boolean eos) { |
| if (error == null) done(new ReadData(lostSize, data, eos)); |
| else if (!isStopped()) |
| error(error); |
| } |
| }); |
| } |
| }; |
| |
| // Push the task object to the runnable instance |
| setActiveTask(task); |
| |
| // Block until some data is received |
| return task.get(); |
| } |
| |
| /** |
| * Notify the data receiver that some data has been received. |
| * |
| * @param data The data or <code>null</code>. |
| */ |
| protected final void notifyReceiver(final String data, final StreamsDataReceiver[] receivers) { |
| if (data == null) return; |
| // Notify the data receiver |
| for (StreamsDataReceiver receiver : receivers) { |
| try { |
| // Get the writer |
| Writer writer = receiver.getWriter(); |
| // Append the data |
| writer.write(data); |
| // And flush it |
| writer.flush(); |
| // Notify potential listeners |
| receiver.notifyListener(data); |
| } catch (IOException e) { |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, null)) { |
| IStatus status = new Status(IStatus.WARNING, CoreBundleActivator.getUniqueIdentifier(), |
| NLS.bind(Messages.ProcessStreamReaderRunnable_error_appendFailed, streamId, data), |
| e); |
| Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Default TCF remote process stream writer runnable implementation. The |
| * runnable will be executed within a thread and is responsible to read the |
| * incoming data from the registered providers and forward them to the associated stream. |
| */ |
| protected class StreamWriterRunnable implements Runnable { |
| // The associated stream id |
| /* default */ final String streamId; |
| // The associated stream type id |
| private final String streamTypeId; |
| // The data provider applicable for the associated stream type id |
| private final StreamsDataProvider provider; |
| // The currently active write task |
| private TCFTask<Object> activeTask; |
| // The callback to invoke if the runnable stopped |
| private ICallback callback; |
| |
| // Flag to stop the runnable |
| private boolean stopped = false; |
| |
| /** |
| * Constructor. |
| * |
| * @param streamId The associated stream id. Must not be <code>null</code>. |
| * @param streamTypeId The associated stream type id. Must not be <code>null</code>. |
| * @param provider The data provider. Must not be <code>null</code> and must be applicable for the stream type. |
| */ |
| public StreamWriterRunnable(String streamId, String streamTypeId, StreamsDataProvider provider) { |
| Assert.isNotNull(streamId); |
| Assert.isNotNull(streamTypeId); |
| Assert.isNotNull(provider); |
| Assert.isTrue(provider.isApplicable(streamTypeId)); |
| |
| this.streamId = streamId; |
| this.streamTypeId = streamTypeId; |
| this.provider = provider; |
| } |
| |
| /** |
| * Returns the associated stream id. |
| * |
| * @return The associated stream id. |
| */ |
| public final String getStreamId() { |
| return streamId; |
| } |
| |
| /** |
| * Returns the associated stream type id. |
| * |
| * @return The associated stream type id. |
| */ |
| public final String getStreamTypeId() { |
| return streamTypeId; |
| } |
| |
| /** |
| * Stop the runnable. |
| * |
| * @param callback The callback to invoke if the runnable stopped. |
| */ |
| public final synchronized void stop(ICallback callback) { |
| // If the runnable is stopped already, invoke the callback directly |
| if (stopped) { |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| return; |
| } |
| |
| // Store the callback instance |
| this.callback = callback; |
| // Mark the runnable as stopped |
| stopped = true; |
| } |
| |
| /** |
| * Returns if the runnable should stop. |
| */ |
| protected final synchronized boolean isStopped() { |
| return stopped; |
| } |
| |
| /** |
| * Sets the currently active writer task. |
| * |
| * @param task The currently active writer task or <code>null</code>. |
| */ |
| protected final void setActiveTask(TCFTask<Object> task) { |
| activeTask = task; |
| } |
| |
| /** |
| * Returns the currently active writer task. |
| * |
| * @return The currently active writer task or <code>null</code>. |
| */ |
| protected final TCFTask<Object> getActiveTask() { |
| return activeTask; |
| } |
| |
| /** |
| * Returns the callback instance to invoke. |
| * |
| * @return The callback instance or <code>null</code>. |
| */ |
| protected final ICallback getCallback() { |
| return callback; |
| } |
| |
| /* (non-Javadoc) |
| * @see java.lang.Runnable#run() |
| */ |
| @Override |
| public void run() { |
| // If not data provider is set, we are done here immediately |
| if (provider == null) { |
| synchronized (this) { |
| // Mark the runnable definitely stopped |
| stopped = true; |
| } |
| // Invoke the callback directly, if any |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| |
| return; |
| } |
| |
| // Create the data buffer instance |
| final char[] buffer = new char[1024]; |
| |
| // Run until stopped and the streams service is available |
| while (!isStopped() && svcStreams != null) { |
| try { |
| // Read available data from the data provider |
| int charactersRead = provider.getReader().read(buffer, 0, 1024); |
| // Have we reached the end of the stream -> break out |
| if (charactersRead == -1) break; |
| // If we read some data from the provider, write it to the stream |
| if (charactersRead > 0) write(svcStreams, streamId, new String(buffer).getBytes(), charactersRead); |
| } catch (Exception e) { |
| // An error occurred -> Dump to the error log |
| e = ExceptionUtils.checkAndUnwrapException(e); |
| // Check if the blocking read task got canceled |
| if (!(e instanceof CancellationException)) { |
| // Log the error to the user, might be something serious |
| IStatus status = new Status(IStatus.ERROR, CoreBundleActivator.getUniqueIdentifier(), |
| NLS.bind(Messages.ProcessStreamWriterRunnable_error_writeFailed, streamId, e.getLocalizedMessage()), |
| e); |
| Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status); |
| } |
| // break out of the loop |
| break; |
| } |
| } |
| |
| // Disconnect from the stream |
| if (svcStreams != null) { |
| Runnable runnable = new Runnable() { |
| @Override |
| public void run() { |
| svcStreams.eos(streamId, new IStreams.DoneEOS() { |
| @Override |
| public void doneEOS(IToken token, Exception error) { |
| svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() { |
| @Override |
| @SuppressWarnings("synthetic-access") |
| public void doneDisconnect(IToken token, Exception error) { |
| synchronized (this) { |
| // Mark the runnable definitely stopped |
| stopped = true; |
| } |
| // Disconnect is done, ignore any error, invoke the callback |
| if (getCallback() != null) getCallback().done(this, Status.OK_STATUS); |
| } |
| }); |
| } |
| }); |
| } |
| }; |
| |
| Protocol.invokeLater(runnable); |
| } else { |
| synchronized (this) { |
| // Mark the runnable definitely stopped |
| stopped = true; |
| } |
| // Invoke the callback directly, if any |
| if (callback != null) callback.done(this, Status.OK_STATUS); |
| } |
| } |
| |
| /** |
| * Writes data to the stream. |
| * |
| * @param service The streams service. Must not be <code>null</code>. |
| * @param streamId The stream id. Must not be <code>null</code>. |
| * @param data The data buffer. Must not be <code>null</code>. |
| * @param size The size of the data to write. |
| * |
| * @throws Exception In case the write fails. |
| */ |
| protected final void write(final IStreams service, final String streamId, final byte[] data, final int size) throws Exception { |
| Assert.isNotNull(service); |
| Assert.isNotNull(streamId); |
| Assert.isTrue(!Protocol.isDispatchThread()); |
| |
| // Create the task object |
| TCFTask<Object> task = new TCFTask<Object>() { |
| @Override |
| public void run() { |
| service.write(streamId, data, 0, size, new IStreams.DoneWrite() { |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.services.IStreams.DoneWrite#doneWrite(org.eclipse.tcf.protocol.IToken, java.lang.Exception) |
| */ |
| @Override |
| public void doneWrite(IToken token, Exception error) { |
| if (error == null) done(null); |
| else error(error); |
| } |
| }); |
| } |
| }; |
| task.get(); |
| |
| // Push the task object to the runnable instance |
| setActiveTask(task); |
| |
| // Execute the write |
| task.get(); |
| } |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param parent The parent process launcher instance. Must not be <code>null</code> |
| */ |
| public ProcessStreamsListener(ProcessLauncher parent) { |
| Assert.isNotNull(parent); |
| this.channel = parent.getChannel(); |
| this.svcStreams = parent.getSvcStreams(); |
| this.svcProcessesName = parent.getSvcProcesses() instanceof IProcessesV1 ? IProcessesV1.NAME : IProcesses.NAME; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.runtime.interfaces.IDisposable#dispose() |
| */ |
| @Override |
| public void dispose() { |
| dispose(null); |
| } |
| |
| /** |
| * Dispose the streams listener instance. |
| * |
| * @param callback The callback to invoke if the dispose finished or <code>null</code>. |
| */ |
| public void dispose(final ICallback callback) { |
| // Store a final reference to the streams listener instance |
| final IChannelManager.IStreamsListener finStreamsListener = this; |
| |
| // Store a final reference to the data receivers list |
| final List<StreamsDataReceiver> finDataReceivers; |
| synchronized (dataReceiver) { |
| finDataReceivers = new ArrayList<StreamsDataReceiver>(dataReceiver); |
| dataReceiver.clear(); |
| } |
| |
| // Create a new collector to catch all runnable stop callback's |
| AsyncCallbackCollector collector = new AsyncCallbackCollector(new Callback() { |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.runtime.callback.Callback#internalDone(java.lang.Object, org.eclipse.core.runtime.IStatus) |
| */ |
| @Override |
| protected void internalDone(final Object caller, final IStatus status) { |
| Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ |
| // Unsubscribe the streams listener from the service |
| Tcf.getChannelManager().unsubscribeStream(channel, svcProcessesName, finStreamsListener, new IChannelManager.DoneUnsubscribeStream() { |
| @Override |
| public void doneUnsubscribeStream(Throwable error) { |
| // Loop all registered listeners and close them |
| for (StreamsDataReceiver receiver : finDataReceivers) receiver.dispose(); |
| // Call the original outer callback |
| if (callback != null) callback.done(caller, status); |
| } |
| }); |
| |
| // Clean the list of processed created events |
| processedCreatedEvents.clear(); |
| } |
| }, new CallbackInvocationDelegate()); |
| |
| // Loop all runnable's and force them to stop |
| synchronized (runnables) { |
| for (Runnable runnable : runnables) { |
| if (runnable instanceof StreamReaderRunnable) { |
| ((StreamReaderRunnable)runnable).stop(new AsyncCallbackCollector.SimpleCollectorCallback(collector)); |
| } |
| if (runnable instanceof StreamWriterRunnable) { |
| ((StreamWriterRunnable)runnable).stop(new AsyncCallbackCollector.SimpleCollectorCallback(collector)); |
| } |
| } |
| runnables.clear(); |
| } |
| |
| // Mark the collector initialization done |
| collector.initDone(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener#setProxy(org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListenerProxy) |
| */ |
| @Override |
| public void setProxy(IStreamsListenerProxy proxy) { |
| this.proxy = proxy; |
| } |
| |
| /** |
| * Adds the given receiver to the stream data receiver list. |
| * |
| * @param receiver The data receiver. Must not be <code>null</code>. |
| */ |
| public void registerDataReceiver(StreamsDataReceiver receiver) { |
| Assert.isNotNull(receiver); |
| synchronized (dataReceiver) { |
| if (!dataReceiver.contains(receiver)) dataReceiver.add(receiver); |
| } |
| } |
| |
| /** |
| * Removes the given receiver from the stream data receiver list. |
| * |
| * @param receiver The data receiver. Must not be <code>null</code>. |
| */ |
| public void unregisterDataReceiver(StreamsDataReceiver receiver) { |
| Assert.isNotNull(receiver); |
| synchronized (dataReceiver) { |
| dataReceiver.remove(receiver); |
| } |
| } |
| |
| /** |
| * Sets the stream data provider instance. |
| * |
| * @param provider The stream data provider instance or <code>null</code>. |
| */ |
| public void setDataProvider(StreamsDataProvider provider) { |
| dataProvider = provider; |
| } |
| |
| /** |
| * Returns the stream data provider instance. |
| * |
| * @return The stream data provider instance or <code>null</code>. |
| */ |
| public StreamsDataProvider getDataProvider() { |
| return dataProvider; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener#setProcessContext(org.eclipse.tcf.services.IProcesses.ProcessContext) |
| */ |
| @Override |
| public void setProcessContext(IProcesses.ProcessContext context) { |
| Assert.isNotNull(context); |
| this.context = context; |
| |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) { |
| CoreBundleActivator.getTraceHandler().trace("Process context set to: id='" + context.getID() + "', name='" + context.getName() + "'", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ |
| 0, ITraceIds.TRACE_STREAMS_LISTENER, |
| IStatus.INFO, getClass()); |
| } |
| |
| // Ask the proxy to process all delayed created events |
| if (proxy != null) proxy.processDelayedCreatedEvents(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener#getProcessContext() |
| */ |
| @Override |
| public final IProcesses.ProcessContext getProcessContext() { |
| return context; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener#hasContext() |
| */ |
| @Override |
| public final boolean hasContext() { |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) { |
| CoreBundleActivator.getTraceHandler().trace("Remote process stream listener: hasContext = " + (context != null), //$NON-NLS-1$ |
| 0, ITraceIds.TRACE_STREAMS_LISTENER, |
| IStatus.INFO, getClass()); |
| } |
| return context != null; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener#isCreatedConsumed(java.lang.String, java.lang.String, java.lang.String) |
| */ |
| @Override |
| public final boolean isCreatedConsumed(String stream_type, String stream_id, String context_id) { |
| boolean consumed = context != null && context.getID().equals(context_id); |
| |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) { |
| CoreBundleActivator.getTraceHandler().trace("Remote process stream listener: isCreatedConsumed = " + consumed, //$NON-NLS-1$ |
| 0, ITraceIds.TRACE_STREAMS_LISTENER, |
| IStatus.INFO, getClass()); |
| } |
| |
| return consumed; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String) |
| */ |
| @Override |
| public void created(final String streamType, final String streamId, final String contextId) { |
| // We ignore any other stream type than the associated process service name |
| if (!svcProcessesName.equals(streamType)) return; |
| |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) { |
| CoreBundleActivator.getTraceHandler().trace("New remote process stream created: streamId='" + streamId + "', contextId='" + contextId + "'", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ |
| 0, ITraceIds.TRACE_STREAMS_LISTENER, |
| IStatus.INFO, getClass()); |
| } |
| |
| // Create the internal representation of the created event |
| final String event = streamType + ";" + streamId + ";" + contextId; //$NON-NLS-1$ //$NON-NLS-2$ |
| |
| // Check if the created event is really consumed by us |
| if (isCreatedConsumed(streamType, streamId, contextId) && !processedCreatedEvents.contains(event)) { |
| // Create a snapshot of the registered data receivers |
| StreamsDataReceiver[] receivers; |
| synchronized (dataReceiver) { |
| receivers = dataReceiver.toArray(new StreamsDataReceiver[dataReceiver.size()]); |
| } |
| // The created event is for the monitored process context |
| // --> Create the stream reader thread(s) |
| if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDIN_ID))) { |
| // Data provider set? |
| if (dataProvider != null) { |
| // Create the stdin stream writer runnable |
| StreamWriterRunnable runnable = new StreamWriterRunnable(streamId, IProcesses.PROP_STDIN_ID, dataProvider); |
| // Add to the list of created runnable's |
| synchronized (runnables) { runnables.add(runnable); } |
| // And create and start the thread |
| Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDIN_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$ |
| thread.start(); |
| } |
| } |
| if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDOUT_ID))) { |
| // Create the stdout stream reader runnable |
| StreamReaderRunnable runnable = new StreamReaderRunnable(streamId, IProcesses.PROP_STDOUT_ID, receivers); |
| // If not empty, create the thread |
| if (!runnable.isEmpty()) { |
| // Add to the list of created runnable's |
| synchronized (runnables) { runnables.add(runnable); } |
| // And create and start the thread |
| Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDOUT_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$ |
| thread.start(); |
| } |
| } |
| if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDERR_ID))) { |
| // Create the stdout stream reader runnable |
| StreamReaderRunnable runnable = new StreamReaderRunnable(streamId, IProcesses.PROP_STDERR_ID, receivers); |
| // If not empty, create the thread |
| if (!runnable.isEmpty()) { |
| // Add to the list of created runnable's |
| synchronized (runnables) { runnables.add(runnable); } |
| // And create and start the thread |
| Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDERR_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$ |
| thread.start(); |
| } |
| } |
| |
| // Remember that we have seen this event already in order to avoid to process it again |
| // if the streams listener proxy is iterating through delayed events |
| processedCreatedEvents.add(event); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.tcf.services.IStreams.StreamsListener#disposed(java.lang.String, java.lang.String) |
| */ |
| @Override |
| public void disposed(String streamType, String streamId) { |
| // We ignore any other stream type than the associated process service name |
| if (!svcProcessesName.equals(streamType)) return; |
| |
| boolean consumed = false; |
| |
| // Stop the thread(s) if the disposed event is for the active |
| // monitored stream id(s). |
| synchronized (runnables) { |
| Iterator<Runnable> iterator = runnables.iterator(); |
| while (iterator.hasNext()) { |
| Runnable runnable = iterator.next(); |
| if (runnable instanceof StreamReaderRunnable) { |
| StreamReaderRunnable myRunnable = (StreamReaderRunnable)runnable; |
| if (myRunnable.getStreamId().equals(streamId)) { |
| // This method is called within the TCF event dispatch thread, so |
| // we cannot wait for a callback here |
| myRunnable.stop(null); |
| iterator.remove(); |
| consumed |= true; |
| } |
| } |
| } |
| } |
| |
| if (consumed) { |
| if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) { |
| CoreBundleActivator.getTraceHandler().trace("Remote process stream disposed: streamId='" + streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$ |
| 0, ITraceIds.TRACE_STREAMS_LISTENER, |
| IStatus.INFO, getClass()); |
| } |
| } |
| } |
| } |