blob: 0001626bc52758648e387e776325c84e810f7838 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
******************************************************************************/
package org.eclipse.ptp.rm.mpi.openmpi.core.rtsystem;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.osgi.util.NLS;
import org.eclipse.ptp.core.PTPCorePlugin;
import org.eclipse.ptp.core.attributes.AttributeManager;
import org.eclipse.ptp.core.attributes.IAttribute;
import org.eclipse.ptp.core.attributes.IntegerAttribute;
import org.eclipse.ptp.core.elementcontrols.IPProcessControl;
import org.eclipse.ptp.core.elements.IPJob;
import org.eclipse.ptp.core.elements.IPProcess;
import org.eclipse.ptp.core.elements.IPQueue;
import org.eclipse.ptp.core.elements.IResourceManager;
import org.eclipse.ptp.core.elements.attributes.ElementAttributes;
import org.eclipse.ptp.core.elements.attributes.JobAttributes;
import org.eclipse.ptp.core.elements.attributes.ProcessAttributes;
import org.eclipse.ptp.remote.core.IRemoteProcessBuilder;
import org.eclipse.ptp.rm.core.MPIJobAttributes;
import org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystem;
import org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob;
import org.eclipse.ptp.rm.core.utils.DebugUtil;
import org.eclipse.ptp.rm.core.utils.IInputStreamListener;
import org.eclipse.ptp.rm.core.utils.InputStreamListenerToOutputStream;
import org.eclipse.ptp.rm.core.utils.InputStreamObserver;
import org.eclipse.ptp.rm.mpi.openmpi.core.OpenMPILaunchAttributes;
import org.eclipse.ptp.rm.mpi.openmpi.core.OpenMPIPlugin;
import org.eclipse.ptp.rm.mpi.openmpi.core.messages.Messages;
import org.eclipse.ptp.rm.mpi.openmpi.core.rmsystem.IOpenMPIResourceManagerConfiguration;
import org.eclipse.ptp.rm.mpi.openmpi.core.rtsystem.OpenMPIProcessMap.Process;
/**
* Master job that implements the Open MPI runtime system. This job must implement each of the
* various phases of the runtime system:
*
* doPrepareExecution = do any debugger startup actions
* doBeforeExecution - merge streams if necessary
* doExecutionStarted - parse output from the mpirun command
* doWaitExectuion - wait until execution has completed
* doExecutionFinished - deal with any issues from program termination
* doExecutionCleanup - cleanup after execution
*
* The type/format of output depends on a range of factors, including the OMPI version and the capabilities of the
* remote service provider being used.
*
* OMPI 1.2 generates map data in textual form that must be parsed to extract the relevant information. Map information is
* sent to stderr, but RSE does not separate stdout/stderr so this must be handled as a special case.
*
* OMPI 1.3.x generates map data in (malformed) XML format so we use an XML parser to extract information.
*
* OMPI 1.3.[1,2] wrap stdout and stderr from the program in XML tags, but they are sill sent to the respective streams.
*
* OMPI 1.3 does not wrap stdout and stderr from the program in XML tags.
*
* OMPI 1.3.2 adds <noderesolve> elements to the XML map data.
*
* OMPI 1.3.[1,2,3] malform the XML by dropping </stdout> tags on some lines.
*
* OMPI 1.3.4 and 1.4 add <mpirun> and </mpirun> root tags
*
* @author Daniel Felix Ferber
* @author Greg Watson
*
*/
public class OpenMPIRuntimeSystemJob extends AbstractToolRuntimeSystemJob {
private InputStreamObserver stderrObserver;
private InputStreamObserver stdoutObserver;
/** Exception raised while parsing mpi map information. */
protected Exception parserException = null;
/** Error detected in mpirun output */
protected boolean errorDetected = false;
protected String errorMessage = null;
/** Used to signal map information completed */
protected boolean mapCompleted = false;
protected final ReentrantLock mapLock = new ReentrantLock();
protected final Condition mapCondition = mapLock.newCondition();
/** Main parser thread */
protected Thread parserThread;
protected InputStreamListenerToOutputStream parserListener;
public OpenMPIRuntimeSystemJob(String jobID, String queueID, String name, AbstractToolRuntimeSystem rtSystem, AttributeManager attrMgr) {
super(jobID, queueID, name, rtSystem, attrMgr);
}
/**
* Terminate all processes.
*/
private void terminateProcesses() {
final OpenMPIRuntimeSystem rtSystem = (OpenMPIRuntimeSystem) getRtSystem();
final IResourceManager rm = PTPCorePlugin.getDefault().getUniverse().getResourceManager(rtSystem.getRmID());
if (rm != null) {
final IPQueue queue = rm.getQueueById(getQueueID());
if (queue != null) {
final IPJob ipJob = queue.getJobById(getJobID());
if (ipJob != null) {
/*
* Mark all running and starting processes as finished.
*/
List<String> ids = new ArrayList<String>();
for (IPProcess ipProcess : ipJob.getProcesses()) {
if (ipProcess.getState() != ProcessAttributes.State.COMPLETED) {
ids.add(ipProcess.getID());
}
}
AttributeManager attrMrg = new AttributeManager();
attrMrg.addAttribute(ProcessAttributes.getStateAttributeDefinition().create(ProcessAttributes.State.COMPLETED));
for (String processId : ids) {
rtSystem.changeProcess(processId, attrMrg);
}
}
}
}
}
/**
* Add a process to the job
*
* @param job
* @param proc
*/
protected void addProcess(IPJob job, Process proc) {
OpenMPIRuntimeSystem rts = (OpenMPIRuntimeSystem)getRtSystem();
String nodename = proc.getNode().getResolvedName();
String nodeID = rts.getNodeIDforName(nodename);
if (nodeID != null) {
int processIndex = proc.getIndex();
/*
* Use the index as the process name if the process name returned by the map is bogus
*/
String processName = proc.getName();
if (processName.equals("")) { //$NON-NLS-1$
processName = String.valueOf(processIndex);
}
IPProcessControl process = (IPProcessControl)job.getProcessByIndex(processIndex);
if (process != null) {
AttributeManager attrMgr = new AttributeManager();
attrMgr.addAttribute(ElementAttributes.getNameAttributeDefinition().create(processName));
attrMgr.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create(nodeID));
attrMgr.addAttribute(ProcessAttributes.getStateAttributeDefinition().create(ProcessAttributes.State.RUNNING));
attrMgr.addAttributes(proc.getAttributeManager().getAttributes());
getRtSystem().changeProcess(process.getID(), attrMgr);
}
}
}
/**
* Create the parser thread
*
*/
protected void createParser(final IOpenMPIResourceManagerConfiguration configuration, final IPJob job) {
/*
* Thread that parses map information.
*/
final PipedOutputStream parserOutputStream = new PipedOutputStream();
final PipedInputStream parserInputStream = new PipedInputStream();
try {
parserInputStream.connect(parserOutputStream);
} catch (IOException e) {
assert false; // This exception is not possible
}
setParserListener(new InputStreamListenerToOutputStream(parserOutputStream));
parserThread = new Thread() {
@Override
public void run() {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: display-map parser thread: started", getJobID()); //$NON-NLS-1$
try {
// Parse stdout or stderr, depending on mpi 1.2 or 1.3
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_12)) {
OpenMPIProcessMapText12Parser.parse(parserInputStream, new IOpenMPIProcessMapParserListener() {
public void finish() {
// Empty
}
public void finishMap(AttributeManager manager) {
/*
* Copy job attributes from map.
*/
if (manager.getAttributes().length > 0) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: updating model with display-map information", getJobID()); //$NON-NLS-1$
getRtSystem().changeJob(getJobID(), manager);
}
setMapCompleted();
}
public void newProcess(Process proc) {
addProcess(job, proc);
}
public void start() {
// Empty
}
public void stderr(Process proc, String output) {
// Empty
}
public void stdout(Process proc, String output) {
// Empty
}
});
} else if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13)
|| configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_14)) {
InputStream is = new OpenMPI13xInputStream(parserInputStream);
OpenMPIProcessMapXml13Parser.parse(is, new IOpenMPIProcessMapParserListener() {
public void finish() {
/*
* Turn off listener that generates input for parser when parsing finishes.
* If not done, the parser will close the piped inputstream, making the listener
* get IOExceptions for closed stream.
*/
getParserListener().disable();
}
public void finishMap(AttributeManager manager) {
/*
* Copy job attributes from map.
*/
if (manager.getAttributes().length > 0) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: updating model with display-map information", getJobID()); //$NON-NLS-1$
getRtSystem().changeJob(getJobID(), manager);
}
setMapCompleted();
}
public void newProcess(Process proc) {
addProcess(job, proc);
}
public void start() {
// Empty
}
public void stderr(Process proc, String output) {
String stderr = output;
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13)
&& configuration.getServiceVersion() < 4) {
stderr += "\n"; //$NON-NLS-1$
}
int index = 0;
if (proc != null) {
index = proc.getIndex();
}
IPProcessControl process = (IPProcessControl)job.getProcessByIndex(index);
if (process != null) {
process.addAttribute(ProcessAttributes.getStderrAttributeDefinition().create(stderr));
}
}
public void stdout(Process proc, String output) {
String stdout = output;
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13)
&& configuration.getServiceVersion() < 4) {
stdout += "\n"; //$NON-NLS-1$
}
int index = 0;
if (proc != null) {
index = proc.getIndex();
}
IPProcessControl process = (IPProcessControl)job.getProcessByIndex(index);
if (process != null) {
process.addAttribute(ProcessAttributes.getStdoutAttributeDefinition().create(stdout));
}
}
});
} else {
assert false;
}
} catch (Exception e) {
parserException = e;
DebugUtil.error(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: display-map parser thread: {1}", getJobID(), e); //$NON-NLS-1$
} finally {
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_12)) {
getParserListener().disable();
}
setMapCompleted();
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: display-map parser thread: finished", getJobID()); //$NON-NLS-1$
}
};
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doBeforeExecution(org.eclipse.core.runtime.IProgressMonitor, org.eclipse.ptp.remote.core.IRemoteProcessBuilder)
*/
@Override
protected void doBeforeExecution(IProgressMonitor monitor, IRemoteProcessBuilder builder) throws CoreException {
final IOpenMPIResourceManagerConfiguration configuration = (IOpenMPIResourceManagerConfiguration) getRtSystem().getRmConfiguration();
/*
* Merge stdout and stderr streams for OMPI 1.3.[1,2] since the streams are wrapped in the appropriate XML tags, but
* are still sent separately.
*/
if ((configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13) &&
(configuration.getServiceVersion() > 0 && configuration.getServiceVersion() < 3))) {
builder.redirectErrorStream(true);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doExecutionCleanUp(org.eclipse.core.runtime.IProgressMonitor)
*/
@Override
protected void doExecutionCleanUp(IProgressMonitor monitor) {
if (getProcess() != null) {
getProcess().destroy();
setProcess(null);
}
if (getStderrObserver() != null) {
getStderrObserver().kill();
setStderrObserver(null);
}
if (getStdoutObserver() != null) {
getStdoutObserver().kill();
setStdoutObserver(null);
}
// TODO: more cleanup?
terminateProcesses();
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doExecutionFinished(org.eclipse.core.runtime.IProgressMonitor)
*/
@Override
protected void doExecutionFinished(IProgressMonitor monitor) throws CoreException {
terminateProcesses();
if (getProcess().exitValue() != 0) {
if (!terminateJobFlag) {
changeJobStatusMessage(NLS.bind(Messages.OpenMPIRuntimeSystemJob_Exception_ExecutionFailedWithExitValue, new Integer(getProcess().exitValue())));
changeJobStatus(MPIJobAttributes.Status.ERROR);
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING, "RTS job #{0}: ignoring exit value {1} because job was forced to terminate by user", getJobID(), new Integer(getProcess().exitValue())); //$NON-NLS-1$
} else if (errorDetected) {
changeJobStatusMessage(NLS.bind(Messages.OpenMPIRuntimeSystemJob_Exception_ExecutionFailureDetected, errorMessage));
changeJobStatus(MPIJobAttributes.Status.ERROR);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doExecutionStarted(org.eclipse.core.runtime.IProgressMonitor)
*/
@Override
protected void doExecutionStarted(IProgressMonitor monitor) throws CoreException {
mapCompleted = false;
/*
* Create processes for the job.
*/
final IOpenMPIResourceManagerConfiguration configuration = (IOpenMPIResourceManagerConfiguration) getRtSystem().getRmConfiguration();
final IPJob ipJob = PTPCorePlugin.getDefault().getUniverse().getResourceManager(getRtSystem().getRmID()).getQueueById(getQueueID()).getJobById(getJobID());
IntegerAttribute numProcsAttr = ipJob.getAttribute(JobAttributes.getNumberOfProcessesAttributeDefinition());
assert numProcsAttr != null;
getRtSystem().createProcesses(getJobID(), numProcsAttr.getValue().intValue());
/*
* We only require procZero if we're using OMPI 1.2.x or 1.3.[0-3]. Other versions use XML
* for stdout and stderr.
*/
final IPProcess procZero;
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_12)
|| (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13)
&& configuration.getServiceVersion() < 4)) {
procZero = ipJob.getProcessByIndex(0);
} else {
procZero = null;
}
/*
*
* Listener that saves stdout.
*/
final IInputStreamListener stdoutListener = new IInputStreamListener() {
public void newBytes(byte[] bytes, int length) {
String line = new String(bytes, 0, length);
if (!errorDetected && OpenMPIErrorParser.parse(line)) {
errorDetected = true;
errorMessage = OpenMPIErrorParser.getErrorMessage();
}
if (procZero != null) {
procZero.addAttribute(ProcessAttributes.getStdoutAttributeDefinition().create(line));
}
DebugUtil.trace(DebugUtil.RTS_JOB_OUTPUT_TRACING, "RTS job #{0}: {1}", getJobID(), line); //$NON-NLS-1$
}
public void streamClosed() {
// No need to do anything
}
public void streamError(Exception e) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stdout stream: {0}", e); //$NON-NLS-1$
OpenMPIPlugin.log(e);
}
};
/*
*
* Listener that saves stderr.
*/
final IInputStreamListener stderrListener = new IInputStreamListener() {
public void newBytes(byte[] bytes, int length) {
String line = new String(bytes, 0, length);
if (!errorDetected && OpenMPIErrorParser.parse(line)) {
errorDetected = true;
errorMessage = OpenMPIErrorParser.getErrorMessage();
}
if (procZero != null) {
procZero.addAttribute(ProcessAttributes.getStderrAttributeDefinition().create(line));
}
DebugUtil.error(DebugUtil.RTS_JOB_OUTPUT_TRACING, "RTS job #{0}: {1}", getJobID(), line); //$NON-NLS-1$
}
public void streamClosed() {
//
}
public void streamError(Exception e) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stderr stream: {0}", e); //$NON-NLS-1$
OpenMPIPlugin.log(e);
}
};
createParser(configuration, ipJob);
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: starting all threads", getJobID()); //$NON-NLS-1$
/*
* Create and start listeners.
*/
getParser().start();
setStderrObserver(new InputStreamObserver(getProcess().getErrorStream()));
getStderrObserver().addListener(stderrListener);
setStdoutObserver(new InputStreamObserver(getProcess().getInputStream()));
getStdoutObserver().addListener(stdoutListener);
// Parse stdout or stderr, depending on mpi 1.2 or 1.3
if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_12)) {
/*
* Fix for bug #271810
*/
if (!getRtSystem().getRemoteServices().getId().equals("org.eclipse.ptp.remote.RSERemoteServices")) { //$NON-NLS-1$
getStderrObserver().addListener(getParserListener());
} else {
getStdoutObserver().addListener(getParserListener());
}
} else if (configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_13)
|| configuration.getDetectedVersion().equals(IOpenMPIResourceManagerConfiguration.VERSION_14)) {
getStdoutObserver().addListener(getParserListener());
} else {
assert false;
}
getStderrObserver().start();
getStdoutObserver().start();
waitForMapCompleted();
if (parserException != null) {
if (!getProcess().isCompleted()) {
getProcess().destroy();
}
/*
* Wait until both stdout and stderr stop because stream are closed.
* Error messages may be still queued in the stream.
*/
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stderr thread to finish", getJobID()); //$NON-NLS-1$
try {
getStderrObserver().join();
} catch (InterruptedException e1) {
// Ignore
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stdout thread to finish", getJobID()); //$NON-NLS-1$
try {
getStdoutObserver().join();
} catch (InterruptedException e1) {
// Ignore
}
throw OpenMPIPlugin.coreErrorException("Failed to parse output of Open MPI command. Check output for errors.", parserException); //$NON-NLS-1$
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doPrepareExecution(org.eclipse.core.runtime.IProgressMonitor)
*/
@Override
protected void doPrepareExecution(IProgressMonitor monitor) throws CoreException {
// Nothing to do
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doRetrieveToolBaseSubstitutionAttributes()
*/
@Override
protected IAttribute<?, ?, ?>[] doRetrieveToolBaseSubstitutionAttributes() throws CoreException {
// TODO make macros available for environment variables and work directory.
return null;
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doRetrieveToolCommandSubstitutionAttributes(org.eclipse.ptp.core.attributes.AttributeManager, java.lang.String, java.util.Map)
*/
@Override
protected IAttribute<?, ?, ?>[] doRetrieveToolCommandSubstitutionAttributes(
AttributeManager baseSubstitutionAttributeManager,
String directory, Map<String, String> environment) {
List<IAttribute<?, ?, ?>> newAttributes = new ArrayList<IAttribute<?,?,?>>();
/*
* An OpenMPI specific attribute.
* Attribute that contains a list of names of environment variables.
*/
int p = 0;
String keys[] = new String[environment.size()];
for (String key : environment.keySet()) {
keys[p++] = key;
}
newAttributes.add(OpenMPILaunchAttributes.getEnvironmentKeysAttributeDefinition().create(keys));
/*
* An OpenMPI specific attribute.
* A shortcut that generates arguments for the OpenMPI run command.
*/
newAttributes.add(OpenMPILaunchAttributes.getEnvironmentArgsAttributeDefinition().create());
return newAttributes.toArray(new IAttribute<?, ?, ?>[newAttributes.size()]);
}
@Override
protected HashMap<String, String> doRetrieveToolEnvironment()
throws CoreException {
// No extra environment variables needs to be set for OpenMPI.
return null;
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doTerminateJob()
*/
@Override
protected void doTerminateJob() {
// Empty implementation.
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob#doWaitExecution(org.eclipse.core.runtime.IProgressMonitor)
*/
@Override
protected void doWaitExecution(IProgressMonitor monitor) throws CoreException {
try {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting for display-map parser thread to finish", getJobID()); //$NON-NLS-1$
parserThread.join();
} catch (InterruptedException e) {
// Do nothing.
}
/*
* Wait until both stdout and stderr stop because stream are closed.
* This means that the process has finished.
*/
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stderr thread to finish", getJobID()); //$NON-NLS-1$
try {
getStderrObserver().join();
} catch (InterruptedException e1) {
// Ignore
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stdout thread to finish", getJobID()); //$NON-NLS-1$
try {
getStdoutObserver().join();
} catch (InterruptedException e1) {
// Ignore
}
if (parserException != null) {
throw OpenMPIPlugin.coreErrorException("Failed to parse output of Open MPI command. Check output for errors.", parserException); //$NON-NLS-1$
}
/*
* Still experience has shown that remote process might not have yet terminated, although stdout and stderr is closed.
*/
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting mpi process to finish completely", getJobID()); //$NON-NLS-1$
try {
getProcess().waitFor();
} catch (InterruptedException e) {
// Ignore
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: completely finished", getJobID()); //$NON-NLS-1$
}
/**
* @return the parser thread
*/
protected Thread getParser() {
return parserThread;
}
/**
* @return the parser listener
*/
protected InputStreamListenerToOutputStream getParserListener() {
return parserListener;
}
/**
* @return the stderrObserver
*/
protected InputStreamObserver getStderrObserver() {
return stderrObserver;
}
/**
* @return the stdoutObserver
*/
protected InputStreamObserver getStdoutObserver() {
return stdoutObserver;
}
/**
* Signal that the map is complete.
*/
protected void setMapCompleted() {
mapLock.lock();
try {
mapCompleted = true;
mapCondition.signalAll();
} finally {
mapLock.unlock();
}
}
/**
* @return the parser listener
*/
protected void setParserListener(InputStreamListenerToOutputStream listener) {
parserListener = listener;
}
/**
* @param stderrObserver the stderrObserver to set
*/
protected void setStderrObserver(InputStreamObserver stderrObserver) {
this.stderrObserver = stderrObserver;
}
/**
* @param stdoutObserver the stdoutObserver to set
*/
protected void setStdoutObserver(InputStreamObserver stdoutObserver) {
this.stdoutObserver = stdoutObserver;
}
/**
* Wait until the map has been read or some other
* error occurs.
*/
protected void waitForMapCompleted() {
mapLock.lock();
try {
while (!mapCompleted) {
try {
mapCondition.await();
} catch (InterruptedException e) {
// Ignore
}
}
} finally {
mapLock.unlock();
}
}
}