blob: b71007527ddc62db366350a804562d2f13172c4a [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2006 The Regents of the University of California.
* This material was produced under U.S. Government contract W-7405-ENG-36
* for Los Alamos National Laboratory, which is operated by the University
* of California for the U.S. Department of Energy. The U.S. Government has
* rights to use, reproduce, and distribute this software. NEITHER THE
* GOVERNMENT NOR THE UNIVERSITY MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
* ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified
* to produce derivative works, such modified software should be clearly marked,
* so as not to confuse it with the version available from LANL.
*
* Additionally, this program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* LA-CC 04-115
*******************************************************************************/
package org.eclipse.ptp.rmsystem;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.NullProgressMonitor;
import org.eclipse.ptp.core.attributes.AttributeManager;
import org.eclipse.ptp.core.attributes.IAttributeDefinition;
import org.eclipse.ptp.core.attributes.StringAttribute;
import org.eclipse.ptp.core.elementcontrols.IPJobControl;
import org.eclipse.ptp.core.elementcontrols.IPMachineControl;
import org.eclipse.ptp.core.elementcontrols.IPNodeControl;
import org.eclipse.ptp.core.elementcontrols.IPProcessControl;
import org.eclipse.ptp.core.elementcontrols.IPQueueControl;
import org.eclipse.ptp.core.elementcontrols.IPUniverseControl;
import org.eclipse.ptp.core.elements.IPJob;
import org.eclipse.ptp.core.elements.IResourceManager;
import org.eclipse.ptp.core.elements.attributes.ElementAttributeManager;
import org.eclipse.ptp.core.elements.attributes.JobAttributes;
import org.eclipse.ptp.core.elements.attributes.MessageAttributes;
import org.eclipse.ptp.core.elements.attributes.ResourceManagerAttributes;
import org.eclipse.ptp.core.util.RangeSet;
import org.eclipse.ptp.rtsystem.IRuntimeEventListener;
import org.eclipse.ptp.rtsystem.IRuntimeSystem;
import org.eclipse.ptp.rtsystem.events.IRuntimeAttributeDefinitionEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeConnectedStateEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeErrorStateEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeJobChangeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeMachineChangeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeMessageEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNewJobEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNewMachineEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNewNodeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNewProcessEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNewQueueEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeNodeChangeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeProcessChangeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeQueueChangeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveAllEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveJobEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveMachineEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveNodeEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveProcessEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRemoveQueueEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeRunningStateEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeShutdownStateEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeStartupErrorEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeSubmitJobErrorEvent;
import org.eclipse.ptp.rtsystem.events.IRuntimeTerminateJobErrorEvent;
public abstract class AbstractRuntimeResourceManager extends
AbstractResourceManager implements IRuntimeEventListener {
public enum JobSubState {SUBMITTED, COMPLETED, ERROR}
private class JobSubmission {
private IPJob job = null;
private JobSubState state = JobSubState.SUBMITTED;
private IRuntimeEvent event;
/**
* @return the event
*/
public IRuntimeEvent getEvent() {
return event;
}
/**
* @return the job
*/
public IPJob getJob() {
return job;
}
/**
* @return the state
*/
public JobSubState getState() {
return state;
}
/**
* @param event the event to set
*/
public void setEvent(IRuntimeEvent event) {
this.event = event;
}
/**
* @param job the job to set
*/
public void setJob(IPJob job) {
this.job = job;
}
/**
* @param error the error to set
*/
public void setState(JobSubState state) {
this.state = state;
}
}
private enum RMState {STARTING, STARTED, STOPPING, STOPPED, ERROR}
private IRuntimeSystem runtimeSystem;
private RMState state;
private final ReentrantLock stateLock = new ReentrantLock();;
private final Condition stateCondition = stateLock.newCondition();
private final ReentrantLock subLock = new ReentrantLock();
private final Condition subCondition = subLock.newCondition();;
private Map<String, JobSubmission> jobSubmissions = new HashMap<String, JobSubmission>();
public AbstractRuntimeResourceManager(String id, IPUniverseControl universe,
IResourceManagerConfiguration config) {
super(id, universe, config);
state = RMState.STOPPED;
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeAttributeDefinitionEvent)
*
* Note: this allows redefinition of attribute definitions. This is ok as long as they
* are only allowed during the initialization phase.
*/
public void handleEvent(IRuntimeAttributeDefinitionEvent e) {
for (IAttributeDefinition<?,?,?> attr : e.getDefinitions()) {
getAttributeDefinitionManager().setAttributeDefinition(attr);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeConnectedStateEvent)
*/
public void handleEvent(IRuntimeConnectedStateEvent e) {
// Ignore
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeErrorStateEvent)
*/
public void handleEvent(IRuntimeErrorStateEvent e) {
/*
* Fatal error in the runtime system. Cancel any pending job submissions
* and inform upper levels of the problem.
*/
subLock.lock();
try {
for (JobSubmission sub : jobSubmissions.values()) {
sub.setState(JobSubState.ERROR);
sub.setEvent(e);
}
subCondition.signalAll();
} finally {
subLock.unlock();
}
stateLock.lock();
try {
RMState oldState = state;
state = RMState.ERROR;
if (oldState == RMState.STOPPING) {
stateCondition.signal();
}
setState(ResourceManagerAttributes.State.ERROR);
cleanUp();
} finally {
stateLock.unlock();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeJobChangeEvent)
*/
public void handleEvent(IRuntimeJobChangeEvent e) {
ElementAttributeManager eMgr = e.getElementAttributeManager();
Map<IPQueueControl, List<IPJobControl>> map =
new HashMap<IPQueueControl, List<IPJobControl>>();
for (Map.Entry<RangeSet, AttributeManager> mgrEntry : eMgr.getEntrySet()) {
AttributeManager attrs = mgrEntry.getValue();
RangeSet jobIds = mgrEntry.getKey();
List<IPJobControl> changedJobs;
for (String elementId : jobIds) {
IPJobControl job = getJobControl(elementId);
if (job != null) {
IPQueueControl queue = job.getQueueControl();
changedJobs = map.get(queue);
if (changedJobs == null) {
changedJobs = new ArrayList<IPJobControl>();
map.put(queue, changedJobs);
}
changedJobs.add(job);
} else {
System.out.println("JobChange: unknown node " + elementId);
}
}
for (Map.Entry<IPQueueControl, List<IPJobControl>> entry : map.entrySet()) {
doUpdateJobs(entry.getKey(), entry.getValue(), attrs);
}
map.clear();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeMachineChangeEvent)
*/
public void handleEvent(IRuntimeMachineChangeEvent e) {
ElementAttributeManager eMgr = e.getElementAttributeManager();
List<IPMachineControl> machines = new ArrayList<IPMachineControl>();
for (Map.Entry<RangeSet, AttributeManager> entry : eMgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet machineIds = entry.getKey();
for (String elementId : machineIds) {
IPMachineControl machine = getMachineControl(elementId);
if (machine != null) {
machines.add(machine);
} else {
System.out.println("MachineChange: unknown machine " + elementId);
}
}
doUpdateMachines(machines, attrs);
machines.clear();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeErrorEvent)
*/
public void handleEvent(IRuntimeMessageEvent e) {
MessageAttributes.Level level = e.getLevel();
// FIXME: implement logging
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNewJobEvent)
*/
public void handleEvent(IRuntimeNewJobEvent e) {
IPQueueControl queue = getQueueControl(e.getParentId());
ElementAttributeManager mgr = e.getElementAttributeManager();
for (Map.Entry<RangeSet, AttributeManager> entry : mgr.getEntrySet()) {
AttributeManager jobAttrs = entry.getValue();
RangeSet jobIds = entry.getKey();
List<IPJobControl> newJobs = new ArrayList<IPJobControl>(jobIds.size());
for (String elementId : jobIds) {
IPJobControl job = getJobControl(elementId);
if (job == null) {
job = doCreateJob(queue, elementId, jobAttrs);
newJobs.add(job);
StringAttribute jobSubAttr =
(StringAttribute) jobAttrs.getAttribute(JobAttributes.getSubIdAttributeDefinition());
if (jobSubAttr != null) {
/*
* Notify any submitJob() calls that the job has been created
*/
subLock.lock();
try {
JobSubmission sub = jobSubmissions.get(jobSubAttr.getValue());
if (sub != null && sub.getState() == JobSubState.SUBMITTED) {
sub.setJob(job);
sub.setState(JobSubState.COMPLETED);
subCondition.signalAll();
}
} finally {
subLock.unlock();
}
}
}
}
addJobs(queue, newJobs);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNewMachineEvent)
*/
public void handleEvent(IRuntimeNewMachineEvent e) {
ElementAttributeManager mgr = e.getElementAttributeManager();
for (Map.Entry<RangeSet, AttributeManager> entry : mgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet machineIds = entry.getKey();
List<IPMachineControl> newMachines = new ArrayList<IPMachineControl>(machineIds.size());
for (String elementId : machineIds) {
IPMachineControl machine = getMachineControl(elementId);
if (machine == null) {
machine = doCreateMachine(elementId, attrs);
newMachines.add(machine);
}
}
addMachines(newMachines);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNewNodeEvent)
*/
public void handleEvent(IRuntimeNewNodeEvent e) {
IPMachineControl machine = getMachineControl(e.getParentId());
if (machine != null) {
ElementAttributeManager mgr = e.getElementAttributeManager();
for (Map.Entry<RangeSet, AttributeManager> entry : mgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet nodeIds = entry.getKey();
List<IPNodeControl> newNodes = new ArrayList<IPNodeControl>(nodeIds.size());
for (String elementId : nodeIds) {
IPNodeControl node = getNodeControl(elementId);
if (node == null) {
node = doCreateNode(machine, elementId, attrs);
newNodes.add(node);
}
}
addNodes(machine, newNodes);
}
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNewProcessEvent)
*/
public void handleEvent(IRuntimeNewProcessEvent e) {
IPJobControl job = getJobControl(e.getParentId());
if (job != null) {
ElementAttributeManager mgr = e.getElementAttributeManager();
for (Map.Entry<RangeSet, AttributeManager> entry : mgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet processIds = entry.getKey();
List<IPProcessControl> newProcesses = new ArrayList<IPProcessControl>(processIds.size());
for (String elementId : processIds) {
IPProcessControl process = getProcessControl(elementId);
if (process == null) {
process = doCreateProcess(job, elementId, attrs);
newProcesses.add(process);
}
}
addProcesses(job, newProcesses);
}
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNewQueueEvent)
*/
public void handleEvent(IRuntimeNewQueueEvent e) {
ElementAttributeManager mgr = e.getElementAttributeManager();
for (Map.Entry<RangeSet, AttributeManager> entry : mgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet queueIds = entry.getKey();
List<IPQueueControl> newQueues = new ArrayList<IPQueueControl>(queueIds.size());
for (String elementId : queueIds) {
IPQueueControl queue = getQueueControl(elementId);
if (queue == null) {
queue = doCreateQueue(elementId, attrs);
newQueues.add(queue);
}
}
addQueues(newQueues);
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeNodeChangeEvent)
*/
public void handleEvent(IRuntimeNodeChangeEvent e) {
ElementAttributeManager eMgr = e.getElementAttributeManager();
Map<IPMachineControl, List<IPNodeControl>> map =
new HashMap<IPMachineControl, List<IPNodeControl>>();
for (Map.Entry<RangeSet, AttributeManager> mgrEntry : eMgr.getEntrySet()) {
AttributeManager attrs = mgrEntry.getValue();
RangeSet nodeIds = mgrEntry.getKey();
List<IPNodeControl> changedNodes;
for (String elementId : nodeIds) {
IPNodeControl node = getNodeControl(elementId);
if (node != null) {
IPMachineControl machine = node.getMachineControl();
changedNodes = map.get(machine);
if (changedNodes == null) {
changedNodes = new ArrayList<IPNodeControl>();
map.put(machine, changedNodes);
}
changedNodes.add(node);
} else {
System.out.println("NodeChange: unknown node " + elementId);
}
}
for (Map.Entry<IPMachineControl, List<IPNodeControl>> entry : map.entrySet()) {
doUpdateNodes(entry.getKey(), entry.getValue(), attrs);
}
map.clear();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeProcessChangeEvent)
*/
public void handleEvent(IRuntimeProcessChangeEvent e) {
ElementAttributeManager eMgr = e.getElementAttributeManager();
Map<IPJobControl, List<IPProcessControl>> map =
new HashMap<IPJobControl, List<IPProcessControl>>();
for (Map.Entry<RangeSet, AttributeManager> mgrEntry : eMgr.getEntrySet()) {
AttributeManager attrs = mgrEntry.getValue();
RangeSet processIds = mgrEntry.getKey();
List<IPProcessControl> changedProcesses;
for (String elementId : processIds) {
IPProcessControl process = getProcessControl(elementId);
if (process != null) {
IPJobControl job = process.getJobControl();
changedProcesses = map.get(job);
if (changedProcesses == null) {
changedProcesses = new ArrayList<IPProcessControl>();
map.put(job, changedProcesses);
}
changedProcesses.add(process);
} else {
System.out.println("ProcessChange: unknown process " + elementId);
}
}
for (Map.Entry<IPJobControl, List<IPProcessControl>> entry : map.entrySet()) {
doUpdateProcesses(entry.getKey(), entry.getValue(), attrs);
}
map.clear();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeQueueChangeEvent)
*/
public void handleEvent(IRuntimeQueueChangeEvent e) {
ElementAttributeManager eMgr = e.getElementAttributeManager();
List<IPQueueControl> queues = new ArrayList<IPQueueControl>();
for (Map.Entry<RangeSet, AttributeManager> entry : eMgr.getEntrySet()) {
AttributeManager attrs = entry.getValue();
RangeSet queueIds = entry.getKey();
for (String elementId : queueIds) {
IPQueueControl queue = getQueueControl(elementId);
if (queue != null) {
queues.add(queue);
} else {
System.out.println("QueueChange: unknown queue " + elementId);
}
}
doUpdateQueues(queues, attrs);
queues.clear();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveAllEvent)
*/
public void handleEvent(IRuntimeRemoveAllEvent e) {
cleanUp();
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveJobEvent)
*/
public void handleEvent(IRuntimeRemoveJobEvent e) {
Map<IPQueueControl, List<IPJobControl>> map =
new HashMap<IPQueueControl, List<IPJobControl>>();
for (String elementId : e.getElementIds()) {
IPJobControl job = getJobControl(elementId);
if (job != null) {
IPQueueControl queue = job.getQueueControl();
List<IPJobControl> jobs = map.get(queue);
if (jobs == null) {
jobs = new ArrayList<IPJobControl>();
map.put(queue, jobs);
}
jobs.add(job);
}
}
for (Map.Entry<IPQueueControl, List<IPJobControl>> entry : map.entrySet()) {
removeJobs(entry.getKey(), entry.getValue());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveMachineEvent)
*/
public void handleEvent(IRuntimeRemoveMachineEvent e) {
Map<IResourceManager, List<IPMachineControl>> map =
new HashMap<IResourceManager, List<IPMachineControl>>();
for (String elementId : e.getElementIds()) {
IPMachineControl machine = getMachineControl(elementId);
if (machine != null) {
IResourceManager rm = machine.getResourceManager();
List<IPMachineControl> machines = map.get(rm);
if (machines == null) {
machines = new ArrayList<IPMachineControl>();
map.put(rm, machines);
}
machines.add(machine);
}
}
for (Map.Entry<IResourceManager, List<IPMachineControl>> entry : map.entrySet()) {
removeMachines(entry.getKey(), entry.getValue());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveNodeEvent)
*/
public void handleEvent(IRuntimeRemoveNodeEvent e) {
Map<IPMachineControl, List<IPNodeControl>> map =
new HashMap<IPMachineControl, List<IPNodeControl>>();
for (String elementId : e.getElementIds()) {
IPNodeControl node = getNodeControl(elementId);
if (node != null) {
IPMachineControl machine = node.getMachineControl();
List<IPNodeControl> nodes = map.get(machine);
if (nodes == null) {
nodes = new ArrayList<IPNodeControl>();
map.put(machine, nodes);
}
nodes.add(node);
}
}
for (Map.Entry<IPMachineControl, List<IPNodeControl>> entry : map.entrySet()) {
removeNodes(entry.getKey(), entry.getValue());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveProcessEvent)
*/
public void handleEvent(IRuntimeRemoveProcessEvent e) {
Map<IPJobControl, List<IPProcessControl>> map =
new HashMap<IPJobControl, List<IPProcessControl>>();
for (String elementId : e.getElementIds()) {
IPProcessControl process = getProcessControl(elementId);
if (process != null) {
IPJobControl job = process.getJobControl();
List<IPProcessControl> processes = map.get(job);
if (processes == null) {
processes = new ArrayList<IPProcessControl>();
map.put(job, processes);
}
processes.add(process);
}
}
for (Map.Entry<IPJobControl, List<IPProcessControl>> entry : map.entrySet()) {
removeProcesses(entry.getKey(), entry.getValue());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRemoveQueueEvent)
*/
public void handleEvent(IRuntimeRemoveQueueEvent e) {
Map<IResourceManager, List<IPQueueControl>> map =
new HashMap<IResourceManager, List<IPQueueControl>>();
for (String elementId : e.getElementIds()) {
IPQueueControl queue = getQueueControl(elementId);
if (queue != null) {
IResourceManager rm = queue.getResourceManager();
List<IPQueueControl> queues = map.get(rm);
if (queues == null) {
queues = new ArrayList<IPQueueControl>();
map.put(rm, queues);
}
queues.add(queue);
}
}
for (Map.Entry<IResourceManager, List<IPQueueControl>> entry : map.entrySet()) {
removeQueues(entry.getKey(), entry.getValue());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeRunningStateEvent)
*/
public void handleEvent(IRuntimeRunningStateEvent e) {
stateLock.lock();
CoreException exc = null;
try {
if (state == RMState.STARTING) {
runtimeSystem.startEvents();
state = RMState.STARTED;
stateCondition.signal();
}
} catch (CoreException ex) {
exc = ex;
} finally {
stateLock.unlock();
}
if (exc != null) {
fireError(exc.getMessage());
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeShutdownStateEvent)
*/
public void handleEvent(IRuntimeShutdownStateEvent e) {
stateLock.lock();
try {
RMState oldState = state;
state = RMState.STOPPED;
if (oldState == RMState.STOPPING) {
stateCondition.signal();
} else {
/*
* This event has been generated by the runtime system. Let upper levels know
* that the RM has shut down.
*/
setState(ResourceManagerAttributes.State.STOPPED);
cleanUp();
}
} finally {
stateLock.unlock();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeStartupErrorEvent)
*/
public void handleEvent(IRuntimeStartupErrorEvent e) {
/*
* Check for errors while starting.
*/
stateLock.lock();
try {
if (state == RMState.STARTING) {
state = RMState.ERROR;
stateCondition.signal();
return;
}
} finally {
stateLock.unlock();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeSubmitJobErrorEvent)
*/
public void handleEvent(IRuntimeSubmitJobErrorEvent e) {
subLock.lock();
try {
/*
switch (level) {
case FATAL:
/*
* Fatal error in the proxy. Terminate all job submissions.
*
for (JobSubmission sub : jobSubmissions.values()) {
sub.setState(JobSubState.ERROR);
sub.setMessage(e);
}
subCondition.signalAll();
break;*/
if (e.getJobSubID() != null) {
JobSubmission sub = jobSubmissions.get(e.getJobSubID());
if (sub != null) {
sub.setState(JobSubState.ERROR);
sub.setEvent(e);
subCondition.signalAll();
}
}
} finally {
subLock.unlock();
}
fireError(e.getErrorMessage());
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rtsystem.IRuntimeEventListener#handleEvent(org.eclipse.ptp.rtsystem.events.IRuntimeTerminateJobErrorEvent)
*/
public void handleEvent(
IRuntimeTerminateJobErrorEvent e) {
// TODO Auto-generated method stub
}
/**
* Abort the RTS connection.
*/
private void abortConnection() {
runtimeSystem.shutdown();
}
/**
* Close the RTS connection.
*/
private void closeConnection() {
runtimeSystem.shutdown();
}
/**
* Open the RTS connection.
*
* @throws CoreException
*/
private void openConnection() throws CoreException {
runtimeSystem.startup();
}
/**
*
*/
protected abstract void doAfterCloseConnection();
/**
*
*/
protected abstract void doAfterOpenConnection();
/**
*
*/
protected abstract void doBeforeCloseConnection();
/**
*
*/
protected abstract void doBeforeOpenConnection();
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doCleanUp()
*/
@Override
protected void doCleanUp() {
state = RMState.STOPPED;
}
/**
* Template pattern method to actually create the job.
*
* @param queue
* @param jobId
* @return
*/
abstract protected IPJobControl doCreateJob(IPQueueControl queue, String jobId, AttributeManager attrs);
/**
* Template pattern method to actually create the machine.
*
* @param machineId
* @return
*/
abstract protected IPMachineControl doCreateMachine(String machineId, AttributeManager attrs);
/**
* Template pattern method to actually create the node.
*
* @param machine
* @param nodeId
* @return
*/
abstract protected IPNodeControl doCreateNode(IPMachineControl machine, String nodeId, AttributeManager attrs);
/**
* Template pattern method to actually create the process.
*
* @param job
* @param processId
* @return
*/
abstract protected IPProcessControl doCreateProcess(IPJobControl job, String processId, AttributeManager attrs);
/**
* Template pattern method to actually create the queue.
*
* @param queueId
* @return
*/
abstract protected IPQueueControl doCreateQueue(String queueId, AttributeManager attrs);
/**
* create a new runtime system
* @return the new runtime system
* @throws CoreException TODO
*/
protected abstract IRuntimeSystem doCreateRuntimeSystem()
throws CoreException;
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doDisableEvents()
*/
protected void doDisableEvents() {
// TODO Auto-generated method stub
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doDispose()
*/
@Override
protected void doDispose() {
// TODO Auto-generated method stub
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doEnableEvents()
*/
protected void doEnableEvents() {
// TODO Auto-generated method stub
}
protected List<IPJobControl> doRemoveTerminatedJobs(IPQueueControl queue) {
List<IPJobControl> terminatedJobs = new ArrayList<IPJobControl>();
if (queue != null) {
for (IPJobControl job : queue.getJobControls()) {
if (job.isTerminated()) {
terminatedJobs.add(job);
}
}
queue.removeJobs(terminatedJobs);
}
return terminatedJobs;
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doShutdown()
*/
protected void doShutdown(IProgressMonitor monitor) throws CoreException {
if (monitor == null) {
monitor = new NullProgressMonitor();
}
stateLock.lock();
try {
doBeforeCloseConnection();
closeConnection();
state = RMState.STOPPING;
while (state != RMState.STOPPED && state != RMState.ERROR) {
try {
stateCondition.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
doAfterCloseConnection();
}
finally {
stateLock.unlock();
runtimeSystem.removeRuntimeEventListener(this);
monitor.done();
}
}
/* (non-Javadoc)
* @see org.eclipse.ptp.rmsystem.AbstractResourceManager#doStartup(org.eclipse.core.runtime.IProgressMonitor)
*/
protected boolean doStartup(IProgressMonitor monitor) throws CoreException {
if (monitor == null) {
monitor = new NullProgressMonitor();
}
stateLock.lock();
try {
if (state == RMState.STOPPED) {
monitor.beginTask("Runtime resource manager startup", 10);
doBeforeOpenConnection();
monitor.subTask("Creating runtime system");
runtimeSystem = doCreateRuntimeSystem();
monitor.worked(1);
runtimeSystem.addRuntimeEventListener(this);
monitor.worked(2);
monitor.subTask("Starting runtime system");
if (!runtimeSystem.startup()) {
monitor.worked(7);
state = RMState.ERROR;
return false;
}
monitor.worked(7);
state = RMState.STARTING;
while (!monitor.isCanceled() && state != RMState.STARTED && state != RMState.ERROR) {
try {
stateCondition.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Expect to be interrupted if monitor is canceled
}
}
if (state == RMState.ERROR) {
return false;
}
if (monitor.isCanceled()) {
state = RMState.STOPPED;
abortConnection();
return false;
}
doAfterOpenConnection();
}
}
finally {
stateLock.unlock();
monitor.done();
}
return true;
}
protected IPJob doSubmitJob(AttributeManager attrMgr, IProgressMonitor monitor) throws CoreException {
if (monitor == null) {
monitor = new NullProgressMonitor();
}
subLock.lock();
try {
String jobSubId = runtimeSystem.submitJob(attrMgr);
jobSubmissions.put(jobSubId, new JobSubmission());
while (!monitor.isCanceled()) {
JobSubmission sub = jobSubmissions.get(jobSubId);
if (sub.getState() != JobSubState.SUBMITTED) {
break;
}
try {
subCondition.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Expect to be interrupted if monitor is canceled
}
}
JobSubmission sub = jobSubmissions.remove(jobSubId);
IPJob job = null;
if (sub != null) {
if (sub.getState() == JobSubState.SUBMITTED && monitor.isCanceled()) {
/*
* The job submission process itself can't be canceled, so
* this will just cancel the job once it is queued.
*
* If job is null, then we must wait for the submission to
* complete and the job to be created (this will need to happen
* in a thread).
*/
//FIXME: implement this
} else {
job = sub.getJob();
}
}
return job;
}
finally {
subLock.unlock();
monitor.done();
}
}
protected void doTerminateJob(IPJob job) throws CoreException {
runtimeSystem.terminateJob(job);
}
/**
* Template pattern method to actually update the jobs.
*
* @param job
* @param attrs
* @return changes were made
*/
abstract protected boolean doUpdateJobs(IPQueueControl queue,
Collection<IPJobControl> jobs, AttributeManager attrs);
/**
* Template pattern method to actually update the machines.
*
* @param machine
* @param attrs
* @return changes were made
*/
abstract protected boolean doUpdateMachines(Collection<IPMachineControl> machines, AttributeManager attrs);
/**
* Template pattern method to update a collection of nodes.
*
* @param machine parent machine
* @param nodes collection of nodes to update
* @param attrs new/changed attibutes for each node in the collection
* @return changes were made
*/
protected abstract boolean doUpdateNodes(IPMachineControl machine,
Collection<IPNodeControl> nodes, AttributeManager attrs);
/**
* Template pattern method to actually update the processes.
*
* @param job parent job
* @param processes collection of processes to update
* @param attrs new/changed attibutes for each node in the collection
* @return changes were made
*/
protected abstract boolean doUpdateProcesses(IPJobControl job,
Collection<IPProcessControl> processes, AttributeManager attrs);
/**
* Template pattern method to actually update the queues.
*
* @param queue
* @param attrs
* @return changes were made
*/
protected abstract boolean doUpdateQueues(Collection<IPQueueControl> queues, AttributeManager attrs);
/**
* @return the runtimeSystem
*/
protected IRuntimeSystem getRuntimeSystem() {
return runtimeSystem;
}
}