blob: b8a7c2b559dc8b9dfd5b704e1dfb871f150677ef [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
******************************************************************************/
package org.eclipse.ptp.rm.mpi.mpich2.core.rtsystem;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.osgi.util.NLS;
import org.eclipse.ptp.core.PTPCorePlugin;
import org.eclipse.ptp.core.attributes.AttributeManager;
import org.eclipse.ptp.core.attributes.IAttribute;
import org.eclipse.ptp.core.attributes.IntegerAttribute;
import org.eclipse.ptp.core.elements.IPJob;
import org.eclipse.ptp.core.elements.IPProcess;
import org.eclipse.ptp.core.elements.attributes.JobAttributes;
import org.eclipse.ptp.core.elements.attributes.ProcessAttributes;
import org.eclipse.ptp.core.elements.attributes.ProcessAttributes.State;
import org.eclipse.ptp.core.elements.events.IChangedProcessEvent;
import org.eclipse.ptp.core.elements.events.INewProcessEvent;
import org.eclipse.ptp.core.elements.events.IProcessChangeEvent;
import org.eclipse.ptp.core.elements.events.IRemoveProcessEvent;
import org.eclipse.ptp.core.elements.listeners.IJobChildListener;
import org.eclipse.ptp.core.elements.listeners.IProcessListener;
import org.eclipse.ptp.remote.core.IRemoteProcessBuilder;
import org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystem;
import org.eclipse.ptp.rm.core.rtsystem.AbstractToolRuntimeSystemJob;
import org.eclipse.ptp.rm.core.utils.DebugUtil;
import org.eclipse.ptp.rm.core.utils.InputStreamListenerToOutputStream;
import org.eclipse.ptp.rm.core.utils.InputStreamObserver;
import org.eclipse.ptp.rm.mpi.mpich2.core.MPICH2JobAttributes;
import org.eclipse.ptp.rm.mpi.mpich2.core.MPICH2LaunchAttributes;
import org.eclipse.ptp.rm.mpi.mpich2.core.MPICH2Plugin;
import org.eclipse.ptp.rm.mpi.mpich2.core.messages.Messages;
/**
*
* @author Daniel Felix Ferber
* @author Greg Watson
*
*/
public class MPICH2RuntimeSystemJob extends AbstractToolRuntimeSystemJob {
public Object lock1 = new Object();
private InputStreamObserver stderrObserver;
private InputStreamObserver stdoutObserver;
protected final ReentrantLock procsLock = new ReentrantLock();
protected final Condition procsCondition = procsLock.newCondition();
protected int numRunningProcs = 0;
protected IProcessListener processListener = new IProcessListener() {
/* (non-Javadoc)
* @see org.eclipse.ptp.core.elements.listeners.IProcessListener#handleEvent(org.eclipse.ptp.core.elements.events.IProcessChangeEvent)
*/
public void handleEvent(IProcessChangeEvent e) {
if (e.getAttributes().getAttribute(ProcessAttributes.getStateAttributeDefinition()) != null
&& e.getSource().getState() == ProcessAttributes.State.RUNNING) {
procsLock.lock();
try {
numRunningProcs++;
procsCondition.signalAll();
} finally {
procsLock.unlock();
}
}
}
};
protected IJobChildListener jobChildListener = new IJobChildListener() {
/* (non-Javadoc)
* @see org.eclipse.ptp.core.elements.listeners.IJobChildListener#handleEvent(org.eclipse.ptp.core.elements.events.IChangedProcessEvent)
*/
public void handleEvent(IChangedProcessEvent e) {
// ignore
}
/* (non-Javadoc)
* @see org.eclipse.ptp.core.elements.listeners.IJobChildListener#handleEvent(org.eclipse.ptp.core.elements.events.INewProcessEvent)
*/
public void handleEvent(INewProcessEvent e) {
for (IPProcess process : e.getProcesses()) {
process.addElementListener(processListener);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.core.elements.listeners.IJobChildListener#handleEvent(org.eclipse.ptp.core.elements.events.IRemoveProcessEvent)
*/
public void handleEvent(IRemoveProcessEvent e) {
for (IPProcess process : e.getProcesses()) {
process.removeElementListener(processListener);
}
}
};
public MPICH2RuntimeSystemJob(String jobID, String queueID, String name, AbstractToolRuntimeSystem rtSystem, AttributeManager attrMgr) {
super(jobID, queueID, name, rtSystem, attrMgr);
}
private void changeAllProcessesStatus(State newState) {
final MPICH2RuntimeSystem rtSystem = (MPICH2RuntimeSystem) getRtSystem();
final IPJob ipJob = PTPCorePlugin.getDefault().getUniverse().getResourceManager(rtSystem.getRmID()).getQueueById(getQueueID()).getJobById(getJobID());
/*
* Mark all running and starting processes as finished.
*/
List<String> ids = new ArrayList<String>();
for (IPProcess ipProcess : ipJob.getProcesses()) {
switch (ipProcess.getState()) {
case EXITED:
case ERROR:
case EXITED_SIGNALLED:
break;
case RUNNING:
case STARTING:
case SUSPENDED:
case UNKNOWN:
ids.add(ipProcess.getID());
break;
}
}
AttributeManager attrMrg = new AttributeManager();
attrMrg.addAttribute(ProcessAttributes.getStateAttributeDefinition().create(newState));
for (String processId : ids) {
rtSystem.changeProcess(processId, attrMrg);
}
}
@Override
protected void doBeforeExecution(IProgressMonitor monitor, IRemoteProcessBuilder builder) throws CoreException {
final MPICH2RuntimeSystem rtSystem = (MPICH2RuntimeSystem) getRtSystem();
final IPJob ipJob = PTPCorePlugin.getDefault().getUniverse().getResourceManager(rtSystem.getRmID()).getQueueById(getQueueID()).getJobById(getJobID());
ipJob.addChildListener(jobChildListener);
}
@Override
protected void doExecutionCleanUp(IProgressMonitor monitor) {
if (getProcess() != null) {
getProcess().destroy();
}
if (stderrObserver != null) {
stderrObserver.kill();
stderrObserver = null;
}
if (stdoutObserver != null) {
stdoutObserver.kill();
stdoutObserver = null;
}
// TODO: more cleanup?
changeAllProcessesStatus(ProcessAttributes.State.EXITED);
}
@Override
protected JobAttributes.State doExecutionFinished(IProgressMonitor monitor) throws CoreException {
changeAllProcessesStatus(ProcessAttributes.State.EXITED);
if (getProcess().exitValue() != 0) {
if (!terminateJobFlag) {
changeJobStatusMessage(NLS.bind(Messages.MPICH2RuntimeSystemJob_Exception_ExecutionFailedWithExitValue, new Integer(getProcess().exitValue())));
return JobAttributes.State.ERROR;
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING, "RTS job #{0}: ignoring exit value {1} because job was forced to terminate by user", getJobID(), new Integer(getProcess().exitValue())); //$NON-NLS-1$
}
return JobAttributes.State.TERMINATED;
}
@Override
protected void doExecutionStarted(IProgressMonitor monitor) throws CoreException {
final MPICH2RuntimeSystem rtSystem = (MPICH2RuntimeSystem) getRtSystem();
final IPJob ipJob = PTPCorePlugin.getDefault().getUniverse().getResourceManager(rtSystem.getRmID()).getQueueById(getQueueID()).getJobById(getJobID());
/*
* Listener that saves stdout.
*/
final PipedOutputStream stdoutOutputStream = new PipedOutputStream();
final PipedInputStream stdoutInputStream = new PipedInputStream();
try {
stdoutInputStream.connect(stdoutOutputStream);
} catch (IOException e) {
assert false; // This exception is not possible
}
final InputStreamListenerToOutputStream stdoutPipedStreamListener = new InputStreamListenerToOutputStream(stdoutOutputStream);
Thread stdoutThread = new Thread() {
@Override
public void run() {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stdout thread: started", getJobID()); //$NON-NLS-1$
BufferedReader stdoutBufferedReader = new BufferedReader(new InputStreamReader(stdoutInputStream));
try {
String line = stdoutBufferedReader.readLine();
while (line != null) {
int index = 0;
int pos = line.indexOf(": "); //$NON-NLS-1$
if (pos > 0) {
try {
index = Integer.parseInt(line.substring(0, pos));
line = line.substring(pos+1);
} catch (NumberFormatException e) {
// ignore
}
}
synchronized (lock1) {
IPProcess ipProc = ipJob.getProcessByIndex(index);
if (ipProc != null) {
ipProc.addAttribute(ProcessAttributes.getStdoutAttributeDefinition().create(line));
}
DebugUtil.trace(DebugUtil.RTS_JOB_OUTPUT_TRACING, "RTS job #{0}:> {1}", getJobID(), line); //$NON-NLS-1$
}
line = stdoutBufferedReader.readLine();
}
} catch (IOException e) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stdout thread: {0}", e); //$NON-NLS-1$
MPICH2Plugin.log(e);
} finally {
stdoutPipedStreamListener.disable();
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stdout thread: finished", getJobID()); //$NON-NLS-1$
}
};
/*
* Listener that saves stderr.
*/
final PipedOutputStream stderrOutputStream = new PipedOutputStream();
final PipedInputStream stderrInputStream = new PipedInputStream();
try {
stderrInputStream.connect(stderrOutputStream);
} catch (IOException e) {
assert false; // This exception is not possible
}
final InputStreamListenerToOutputStream stderrPipedStreamListener = new InputStreamListenerToOutputStream(stderrOutputStream);
Thread stderrThread = new Thread() {
@Override
public void run() {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stderr thread: started", getJobID()); //$NON-NLS-1$
final BufferedReader stderrBufferedReader = new BufferedReader(new InputStreamReader(stderrInputStream));
try {
String line = stderrBufferedReader.readLine();
while (line != null) {
int index = 0;
int pos = line.indexOf(": "); //$NON-NLS-1$
if (pos > 0) {
try {
index = Integer.parseInt(line.substring(0, pos));
line = line.substring(pos+1);
} catch (NumberFormatException e) {
// ignore
}
}
synchronized (lock1) {
IPProcess ipProc = ipJob.getProcessByIndex(index);
if (ipProc != null) {
ipProc.addAttribute(ProcessAttributes.getStderrAttributeDefinition().create(line));
}
DebugUtil.error(DebugUtil.RTS_JOB_OUTPUT_TRACING, "RTS job #{0}:> {1}", getJobID(), line); //$NON-NLS-1$
}
line = stderrBufferedReader.readLine();
}
} catch (IOException e) {
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stderr thread: {1}", getJobID(), e); //$NON-NLS-1$
MPICH2Plugin.log(e);
} finally {
stderrPipedStreamListener.disable();
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: stderr thread: finished", getJobID()); //$NON-NLS-1$
}
};
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: starting all threads", getJobID()); //$NON-NLS-1$
/*
* Create and start listeners.
*/
stdoutThread.start();
stderrThread.start();
stderrObserver = new InputStreamObserver(getProcess().getErrorStream());
stdoutObserver = new InputStreamObserver(getProcess().getInputStream());
stdoutObserver.addListener(stdoutPipedStreamListener);
stderrObserver.addListener(stderrPipedStreamListener);
stderrObserver.start();
stdoutObserver.start();
/*
* At this point we need to pause until all processes have started. This is because
* the model semantics are such that the job state must not be set to RUNNING until
* all the job's processes (if there are any) have been created and also set to RUNNING.
*/
/*
* We know that a MPICH2 job has a number of processes attribute
*/
int numProcs = 1;
IntegerAttribute numProcsAttr = getAttrMgr().getAttribute(JobAttributes.getNumberOfProcessesAttributeDefinition());
if (numProcsAttr != null) {
numProcs = numProcsAttr.getValue().intValue();
}
procsLock.lock();
try {
while (!monitor.isCanceled() && numRunningProcs < numProcs) {
try {
procsCondition.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
} finally {
procsLock.unlock();
}
}
@Override
protected void doPrepareExecution(IProgressMonitor monitor) throws CoreException {
// Nothing to do
}
@Override
protected IAttribute<?, ?, ?>[] doRetrieveToolBaseSubstitutionAttributes() throws CoreException {
// TODO make macros available for environment variables and work directory.
return null;
}
@Override
protected IAttribute<?, ?, ?>[] doRetrieveToolCommandSubstitutionAttributes(
AttributeManager baseSubstitutionAttributeManager,
String directory, Map<String, String> environment) {
List<IAttribute<?, ?, ?>> newAttributes = new ArrayList<IAttribute<?,?,?>>();
/*
* An MPICH2 specific attribute.
* Attribute that contains a list of names of environment variables.
*/
int p = 0;
String keys[] = new String[environment.size()];
for (String key : environment.keySet()) {
keys[p++] = key;
}
newAttributes.add(MPICH2LaunchAttributes.getEnvironmentKeysAttributeDefinition().create(keys));
/*
* An MPICH2 specific attribute.
* A shortcut that generates arguments for the MPICH2 run command.
*/
newAttributes.add(MPICH2LaunchAttributes.getEnvironmentArgsAttributeDefinition().create());
/*
* The jobid is used to alias the MPICH2 job so that it can be matched later.
*/
newAttributes.add(MPICH2JobAttributes.getJobIdAttributeDefinition().create(getJobID()));
return newAttributes.toArray(new IAttribute<?, ?, ?>[newAttributes.size()]);
}
@Override
protected HashMap<String, String> doRetrieveToolEnvironment()
throws CoreException {
// No extra environment variables needs to be set for MPICH2.
return null;
}
@Override
protected void doTerminateJob() {
// Empty implementation.
}
@Override
protected void doWaitExecution(IProgressMonitor monitor) throws CoreException {
/*
* Wait until both stdout and stderr stop because stream are closed.
* This means that the process has finished.
*/
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stderr thread to finish", getJobID()); //$NON-NLS-1$
try {
stderrObserver.join();
} catch (InterruptedException e1) {
// Ignore
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting stdout thread to finish", getJobID()); //$NON-NLS-1$
try {
stdoutObserver.join();
} catch (InterruptedException e1) {
// Ignore
}
/*
* Still experience has shown that remote process might not have yet terminated, although stdout and stderr is closed.
*/
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: waiting mpi process to finish completely", getJobID()); //$NON-NLS-1$
try {
getProcess().waitFor();
} catch (InterruptedException e) {
// Ignore
}
DebugUtil.trace(DebugUtil.RTS_JOB_TRACING_MORE, "RTS job #{0}: completely finished", getJobID()); //$NON-NLS-1$
}
}