blob: 0b38881d83eb3d2b274a3bc092b9873d15001560 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package org.eclipse.ptp.rm.mpi.mpich2.core.rtsystem;
import java.io.BufferedReader;
import java.util.List;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.osgi.util.NLS;
import org.eclipse.ptp.core.PTPCorePlugin;
import org.eclipse.ptp.core.attributes.AttributeManager;
import org.eclipse.ptp.core.elementcontrols.IPProcessControl;
import org.eclipse.ptp.core.elements.IPJob;
import org.eclipse.ptp.core.elements.IPMachine;
import org.eclipse.ptp.core.elements.IPQueue;
import org.eclipse.ptp.core.elements.IResourceManager;
import org.eclipse.ptp.core.elements.attributes.MachineAttributes;
import org.eclipse.ptp.core.elements.attributes.ProcessAttributes;
import org.eclipse.ptp.rm.core.ToolsRMPlugin;
import org.eclipse.ptp.rm.core.rtsystem.AbstractRemoteCommandJob;
import org.eclipse.ptp.rm.mpi.mpich2.core.MPICH2MachineAttributes;
import org.eclipse.ptp.rm.mpi.mpich2.core.MPICH2Plugin;
import org.eclipse.ptp.rm.mpi.mpich2.core.messages.Messages;
/**
*
* @author Greg Watson
*
*/
public class MPICH2PeriodicJob extends AbstractRemoteCommandJob {
MPICH2RuntimeSystem rts;
public MPICH2PeriodicJob(MPICH2RuntimeSystem rts) {
super(rts,
NLS.bind(Messages.MPICH2MonitorJob_name, rts.getRmConfiguration().getName()),
rts.retrieveEffectiveToolRmConfiguration().getPeriodicMonitorCmd(),
Messages.MPICH2MonitorJob_interruptedErrorMessage,
Messages.MPICH2MonitorJob_processErrorMessage,
Messages.MPICH2MonitorJob_parsingErrorMessage,
rts.retrieveEffectiveToolRmConfiguration().getPeriodicMonitorTime());
this.rts = rts;
}
@Override
protected void parse(BufferedReader output) throws CoreException {
/*
* MPI resource manager have only one machine and one queue.
* There they are implicitly "discovered".
*/
IResourceManager rm = PTPCorePlugin.getDefault().getUniverse().getResourceManager(rts.getRmID());
IPMachine machine = rm.getMachineById(rts.getMachineID());
IPQueue queue = rm.getQueueById(rts.getQueueID());
/*
* We may be called before the model has been set up properly. Do nothing
* if this is the case.
*/
if (machine == null || queue == null) {
return;
}
/*
* Any exception from now on is caught in order to add the error message as an attribute to the machine.
* Then, the exception is re-thrown.
*/
try {
/*
* Parse output of mpdlistjobs command.
*/
MPICH2ListJobsParser parser = new MPICH2ListJobsParser();
MPICH2JobMap jobMap = parser.parse(output);
if (jobMap == null) {
throw new CoreException(new Status(IStatus.ERROR, MPICH2Plugin.getDefault().getBundle().getSymbolicName(), parser.getErrorMessage()));
}
/*
* Update model according to data. First create any new jobs.
*/
for (List<MPICH2JobMap.Job> jobs : jobMap.getJobs()) {
for (MPICH2JobMap.Job job : jobs) {
IPJob pJob = queue.getJobById(job.getJobAlias());
if (pJob == null) {
// Not one of our jobs
continue;
}
IPProcessControl process = (IPProcessControl)pJob.getProcessByIndex(job.getRank());
if (process != null) {
// process already exists, don't need to do anything
continue;
}
String nodeID = rts.getNodeIDforName(job.getHost());
if (nodeID == null) {
throw new CoreException(new Status(IStatus.ERROR, ToolsRMPlugin.getDefault().getBundle().getSymbolicName(), Messages.MPICH2RuntimeSystemJob_Exception_HostnamesDoNotMatch, null));
}
String processID = rts.createProcess(job.getJobAlias(), job.getRank(), nodeID);
process = (IPProcessControl)pJob.getProcessById(processID);
process.setState(ProcessAttributes.State.RUNNING);
}
}
} catch (CoreException e) {
/*
* Show message of core exception and change machine status to error.
*/
if (e.getStatus().getSeverity() == IStatus.ERROR) {
AttributeManager attrManager = new AttributeManager();
attrManager.addAttribute(MachineAttributes.getStateAttributeDefinition().create(MachineAttributes.State.ERROR));
attrManager.addAttribute(MPICH2MachineAttributes.getStatusMessageAttributeDefinition().create(NLS.bind(Messages.MPICH2MonitorJob_Exception_CommandFailed, e.getMessage())));
rts.changeMachine(machine.getID(), attrManager);
}
throw e;
} catch (Exception e) {
/*
* Show message of all other exceptions and change machine status to error.
*/
AttributeManager attrManager = new AttributeManager();
attrManager.addAttribute(MachineAttributes.getStateAttributeDefinition().create(MachineAttributes.State.ERROR));
attrManager.addAttribute(MPICH2MachineAttributes.getStatusMessageAttributeDefinition().create(NLS.bind(Messages.MPICH2MonitorJob_Exception_InternalError, e.getMessage())));
rts.changeMachine(machine.getID(), attrManager);
}
}
}