| /******************************************************************************* |
| * Copyright (c) 2011 University of Illinois All rights reserved. This program |
| * and the accompanying materials are made available under the terms of the |
| * Eclipse Public License v1.0 which accompanies this distribution, and is |
| * available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Albert L. Rossi - design and implementation |
| ******************************************************************************/ |
| package org.eclipse.ptp.rm.jaxb.core.runnable.command; |
| |
| import org.eclipse.core.runtime.NullProgressMonitor; |
| import org.eclipse.debug.core.ILaunchConfiguration; |
| import org.eclipse.debug.core.model.IStreamsProxy; |
| import org.eclipse.ptp.remote.core.IRemoteProcess; |
| import org.eclipse.ptp.rm.jaxb.core.ICommandJobStatus; |
| import org.eclipse.ptp.rm.jaxb.core.ICommandJobStreamsProxy; |
| import org.eclipse.ptp.rm.jaxb.core.IJAXBNonNLSConstants; |
| import org.eclipse.ptp.rm.jaxb.core.IJAXBResourceManagerControl; |
| import org.eclipse.ptp.rm.jaxb.core.JAXBCorePlugin; |
| import org.eclipse.ptp.rm.jaxb.core.data.AttributeType; |
| import org.eclipse.ptp.rm.jaxb.core.data.PropertyType; |
| import org.eclipse.ptp.rm.jaxb.core.utils.FileUtils; |
| import org.eclipse.ptp.rm.jaxb.core.utils.RemoteServicesDelegate; |
| import org.eclipse.ptp.rm.jaxb.core.variables.RMVariableMap; |
| import org.eclipse.ptp.rmsystem.IJobStatus; |
| |
| /** |
| * Extension of the IJobStatus class to handle resource manager command jobs. |
| * Also handles availability notification for remote stdout and stderr files. |
| * |
| * @author arossi |
| * |
| */ |
| public class CommandJobStatus implements ICommandJobStatus { |
| |
| private final String rmUniqueName; |
| private final IJAXBResourceManagerControl control; |
| |
| private String jobId; |
| private ILaunchConfiguration launchConfig; |
| private String state; |
| private String stateDetail; |
| private String remoteOutputPath; |
| private String remoteErrorPath; |
| private ICommandJobStreamsProxy proxy; |
| private IRemoteProcess process; |
| |
| private boolean waitEnabled; |
| private long lastUpdateRequest; |
| private boolean dirty = false; |
| private boolean fFilesChecked = false; |
| |
| /** |
| * @param rmUniqueName |
| * owner resource manager |
| * @param control |
| * resource manager control |
| */ |
| public CommandJobStatus(String rmUniqueName, IJAXBResourceManagerControl control) { |
| this(rmUniqueName, null, UNDETERMINED, control); |
| } |
| |
| /** |
| * @param rmUniqueName |
| * owner resource manager |
| * @param jobId |
| * @param state |
| * @param control |
| * resource manager control |
| */ |
| public CommandJobStatus(String rmUniqueName, String jobId, String state, IJAXBResourceManagerControl control) { |
| this.rmUniqueName = rmUniqueName; |
| this.jobId = jobId; |
| this.state = state; |
| this.control = control; |
| assert (null != control); |
| waitEnabled = true; |
| lastUpdateRequest = 0; |
| } |
| |
| /** |
| * Closes the proxy and calls destroy on the process. Used for interactive |
| * job cancellation. |
| */ |
| public synchronized boolean cancel() { |
| if (process != null) { |
| process.destroy(); |
| if (proxy != null) { |
| proxy.close(); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Notifies all callers of <code>waitForId</code> to exit wait. |
| */ |
| public void cancelWait() { |
| synchronized (this) { |
| waitEnabled = false; |
| notifyAll(); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.ptp.rm.jaxb.core.ICommandJobStatus#getControl() |
| */ |
| public IJAXBResourceManagerControl getControl() { |
| return control; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.ptp.rmsystem.IJobStatus#getErrorPath() |
| */ |
| public String getErrorPath() { |
| return remoteErrorPath; |
| } |
| |
| /** |
| * @return jobId either internal UUID or resource-specific id |
| */ |
| public synchronized String getJobId() { |
| return jobId; |
| } |
| |
| public synchronized long getLastUpdateRequest() { |
| return lastUpdateRequest; |
| } |
| |
| /** |
| * @return configuration used for this submission. |
| */ |
| public ILaunchConfiguration getLaunchConfiguration() { |
| return launchConfig; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.ptp.rmsystem.IJobStatus#getOutputPath() |
| */ |
| public String getOutputPath() { |
| return remoteOutputPath; |
| } |
| |
| /** |
| * |
| * @return owner resource manager id |
| */ |
| public String getRmUniqueName() { |
| return rmUniqueName; |
| } |
| |
| /** |
| * @return state of the job (not of the submission process). |
| */ |
| public synchronized String getState() { |
| return state; |
| } |
| |
| /** |
| * @return more specific state identifier. |
| */ |
| public synchronized String getStateDetail() { |
| return stateDetail; |
| } |
| |
| /** |
| * Wrapper containing monitoring functionality for the associated output and |
| * error streams. |
| */ |
| public IStreamsProxy getStreamsProxy() { |
| return proxy; |
| } |
| |
| /* |
| * NOTE: since the script/job attribute defining this path is generated |
| * prior to submission, @jobId cannot appear in the path; at the same time, |
| * a batch variable replacement will not work, as that would not be |
| * interpretable for the RM. One actually needs to configure two separate |
| * strings in this case, giving one to the script and one to the resource |
| * manager. We treat the path as requiring a possible substitution of the |
| * jobId tag. |
| * |
| * @see |
| * org.eclipse.ptp.rm.jaxb.core.ICommandJobStatus#initialize(java.lang.String |
| * ) |
| */ |
| public void initialize(String jobId) { |
| this.jobId = jobId; |
| String path = null; |
| RMVariableMap rmVarMap = control.getEnvironment(); |
| Object o = rmVarMap.get(STDOUT_REMOTE_FILE); |
| if (o != null) { |
| if (o instanceof PropertyType) { |
| path = (String) ((PropertyType) o).getValue(); |
| } else if (o instanceof AttributeType) { |
| path = (String) ((PropertyType) o).getValue(); |
| } |
| path = rmVarMap.getString(path); |
| remoteOutputPath = path.replaceAll(IJAXBNonNLSConstants.JOB_ID_TAG, jobId); |
| } |
| o = rmVarMap.get(STDERR_REMOTE_FILE); |
| if (o != null) { |
| if (o instanceof PropertyType) { |
| path = (String) ((PropertyType) o).getValue(); |
| } else if (o instanceof AttributeType) { |
| path = (String) ((AttributeType) o).getValue(); |
| } |
| path = rmVarMap.getString(path); |
| remoteErrorPath = path.replaceAll(IJAXBNonNLSConstants.JOB_ID_TAG, jobId); |
| } |
| } |
| |
| /** |
| * @return whether a process object has been attached to this status object |
| * (in which case the submission is not through an asynchronous job |
| * scheduler). |
| */ |
| public boolean isInteractive() { |
| return process != null; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.eclipse.ptp.rm.jaxb.core.ICommandJobStatus#maybeWaitForHandlerFiles |
| * (int) |
| */ |
| public void maybeWaitForHandlerFiles(int blockForSecs) { |
| if (fFilesChecked) { |
| return; |
| } |
| |
| Thread tout = null; |
| Thread terr = null; |
| |
| if (remoteOutputPath != null) { |
| tout = checkForReady(remoteOutputPath, blockForSecs); |
| |
| } |
| |
| if (remoteErrorPath != null) { |
| terr = checkForReady(remoteErrorPath, blockForSecs); |
| } |
| |
| if (tout != null) { |
| try { |
| tout.join(); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| |
| if (terr != null) { |
| try { |
| terr.join(); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| setState(IJobStatus.JOB_OUTERR_READY); |
| fFilesChecked = true; |
| } |
| |
| /** |
| * @param launchConfig |
| * configuration used for this submission. |
| */ |
| public void setLaunchConfig(ILaunchConfiguration launchConfig) { |
| this.launchConfig = launchConfig; |
| } |
| |
| /** |
| * @param process |
| * object (used for interactive cancellation) |
| */ |
| public void setProcess(IRemoteProcess process) { |
| this.process = process; |
| } |
| |
| /** |
| * We also immediately dereference any paths associated with the job by |
| * calling intialize, as the jobId property may not be in the environment |
| * after this initial call returns. |
| * |
| * @param proxy |
| * Wrapper containing monitoring functionality for the associated |
| * output and error streams. |
| */ |
| public void setProxy(ICommandJobStreamsProxy proxy) { |
| this.proxy = proxy; |
| initialize(jobId); |
| } |
| |
| /** |
| * @param state |
| * of the job (not of the submission process). |
| */ |
| public synchronized void setState(String state) { |
| dirty = false; |
| String previousDetail = stateDetail; |
| if (UNDETERMINED.equals(state)) { |
| this.state = UNDETERMINED; |
| stateDetail = UNDETERMINED; |
| } else if (SUBMITTED.equals(state)) { |
| this.state = SUBMITTED; |
| stateDetail = SUBMITTED; |
| } else if (RUNNING.equals(state)) { |
| this.state = RUNNING; |
| stateDetail = RUNNING; |
| } else if (SUSPENDED.equals(state)) { |
| this.state = SUSPENDED; |
| stateDetail = SUSPENDED; |
| } else if (COMPLETED.equals(state)) { |
| this.state = COMPLETED; |
| stateDetail = COMPLETED; |
| } else if (QUEUED_ACTIVE.equals(state)) { |
| this.state = SUBMITTED; |
| stateDetail = QUEUED_ACTIVE; |
| } else if (SYSTEM_ON_HOLD.equals(state)) { |
| this.state = SUBMITTED; |
| stateDetail = SYSTEM_ON_HOLD; |
| } else if (USER_ON_HOLD.equals(state)) { |
| this.state = SUBMITTED; |
| stateDetail = USER_ON_HOLD; |
| } else if (USER_SYSTEM_ON_HOLD.equals(state)) { |
| this.state = SUBMITTED; |
| stateDetail = USER_SYSTEM_ON_HOLD; |
| } else if (SYSTEM_SUSPENDED.equals(state)) { |
| this.state = SUSPENDED; |
| stateDetail = SYSTEM_SUSPENDED; |
| } else if (USER_SUSPENDED.equals(state)) { |
| this.state = SUSPENDED; |
| stateDetail = USER_SUSPENDED; |
| } else if (USER_SYSTEM_SUSPENDED.equals(state)) { |
| this.state = SUSPENDED; |
| stateDetail = USER_SYSTEM_SUSPENDED; |
| } else if (FAILED.equals(state)) { |
| this.state = COMPLETED; |
| stateDetail = FAILED; |
| } else if (JOB_OUTERR_READY.equals(state)) { |
| this.state = COMPLETED; |
| stateDetail = JOB_OUTERR_READY; |
| } |
| if (previousDetail == null || !previousDetail.equals(stateDetail)) { |
| dirty = true; |
| } |
| } |
| |
| /** |
| * @param time |
| * in milliseconds of last update request issued to remote |
| * resource |
| */ |
| public synchronized void setUpdateRequestTime(long update) { |
| lastUpdateRequest = update; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.eclipse.ptp.rm.jaxb.core.ICommandJobStatus#stateChanged() |
| */ |
| public boolean stateChanged() { |
| boolean changed = dirty; |
| dirty = false; |
| return changed; |
| } |
| |
| /** |
| * Wait until the jobId has been set on the job id property in the |
| * environment. |
| * |
| * @param uuid |
| * key for the property containing as its name the |
| * resource-specific jobId and as its value its initial state |
| * (SUBMITTED) |
| */ |
| public void waitForJobId(String uuid) { |
| synchronized (this) { |
| while (waitEnabled && jobId == null) { |
| try { |
| wait(1000); |
| } catch (InterruptedException ignored) { |
| } |
| RMVariableMap env = control.getEnvironment(); |
| if (env == null) { |
| break; |
| } |
| PropertyType p = (PropertyType) env.get(uuid); |
| if (p != null) { |
| jobId = p.getName(); |
| String v = (String) p.getValue(); |
| if (v != null) { |
| state = v; |
| } |
| |
| } |
| } |
| } |
| } |
| |
| /** |
| * Checks for file existence, then waits 3 seconds to compare file length. |
| * If block is false, the listeners may be notified that the file is still |
| * not ready; else the listeners will receive a ready = true notification |
| * when the file does finally stabilize. (non-Javadoc) |
| * |
| * @param path |
| * @param blockInSeconds |
| * @return thread running the check |
| */ |
| private Thread checkForReady(final String path, final int block) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| boolean ready = false; |
| long timeout = block * 1000; |
| RemoteServicesDelegate d = control.getRemoteServicesDelegate(); |
| long start = System.currentTimeMillis(); |
| while (!ready) { |
| try { |
| ready = FileUtils.isStable(d.getRemoteFileManager(), path, 3, new NullProgressMonitor()); |
| } catch (Throwable t) { |
| JAXBCorePlugin.log(t); |
| } |
| |
| if (System.currentTimeMillis() - start >= timeout) { |
| break; |
| } |
| |
| synchronized (this) { |
| try { |
| wait(IJAXBNonNLSConstants.READY_FILE_PAUSE); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| } |
| }; |
| t.start(); |
| return t; |
| } |
| } |