/*******************************************************************************
 * Copyright (c) 2011 University of Illinois All rights reserved. This program
 * and the accompanying materials are made available under the terms of the
 * Eclipse Public License v1.0 which accompanies this distribution, and is
 * available at http://www.eclipse.org/legal/epl-v10.html 
 * 	
 * Contributors: 
 * 	Albert L. Rossi - design and implementation
 ******************************************************************************/
package org.eclipse.ptp.rm.jaxb.core.runnable.command;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.List;
import java.util.Map;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IConfigurationElement;
import org.eclipse.core.runtime.IExtensionPoint;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Platform;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.ptp.remote.core.IRemoteProcess;
import org.eclipse.ptp.remote.core.IRemoteProcessBuilder;
import org.eclipse.ptp.rm.jaxb.core.ICommandJobStreamsProxy;
import org.eclipse.ptp.rm.jaxb.core.IJAXBNonNLSConstants;
import org.eclipse.ptp.rm.jaxb.core.IJAXBResourceManagerControl;
import org.eclipse.ptp.rm.jaxb.core.IStreamParserTokenizer;
import org.eclipse.ptp.rm.jaxb.core.JAXBCorePlugin;
import org.eclipse.ptp.rm.jaxb.core.data.ArgType;
import org.eclipse.ptp.rm.jaxb.core.data.CommandType;
import org.eclipse.ptp.rm.jaxb.core.data.NameValuePairType;
import org.eclipse.ptp.rm.jaxb.core.data.TokenizerType;
import org.eclipse.ptp.rm.jaxb.core.data.impl.ArgImpl;
import org.eclipse.ptp.rm.jaxb.core.messages.Messages;
import org.eclipse.ptp.rm.jaxb.core.utils.CoreExceptionUtils;
import org.eclipse.ptp.rm.jaxb.core.utils.EnvironmentVariableUtils;
import org.eclipse.ptp.rm.jaxb.core.utils.RemoteServicesDelegate;
import org.eclipse.ptp.rm.jaxb.core.variables.RMVariableMap;

/**
 * Implementation of runnable Job for executing external processes. Uses the
 * IRemoteProcessBuilder with the IRemoteConnection for the resource manager's
 * target.
 * 
 * @author arossi
 * 
 */
public class CommandJob extends Job implements IJAXBNonNLSConstants {

	/**
	 * Internal class used for multiplexing output streams between two different
	 * endpoints, usually a tokenizer on the one hand and the stream proxy
	 * passed back to the caller on the other.
	 * 
	 * @author arossi
	 */
	private class StreamSplitter extends Thread {
		private final InputStream in;
		private final PipedOutputStream[] pout;
		private final BufferedOutputStream[] bout;

		/**
		 * @param in
		 *            the stream to be multiplexed
		 * @param pipe1
		 *            sink's stream from which it will read
		 * @param pipe2
		 *            sink's stream from which it will read
		 * @throws IOException
		 */
		private StreamSplitter(InputStream in, PipedInputStream pipe1, PipedInputStream pipe2) throws IOException {
			this.in = in;
			assert (pipe1 != null && pipe2 != null);
			pout = new PipedOutputStream[] { new PipedOutputStream(pipe1), new PipedOutputStream(pipe2) };
			bout = new BufferedOutputStream[] { new BufferedOutputStream(pout[0], STREAM_BUFFER_SIZE),
					new BufferedOutputStream(pout[1], STREAM_BUFFER_SIZE) };
		}

		/**
		 * Reads from input and writes to the piped streams.
		 */
		@Override
		public void run() {
			BufferedInputStream bin = new BufferedInputStream(in);
			while (true) {
				try {
					int i = bin.read();
					if (i == -1) {
						break;
					}
					for (BufferedOutputStream b : bout) {
						b.write(i);
						b.flush();
					}
				} catch (EOFException eof) {
					break;
				} catch (IOException t) {
					JAXBCorePlugin.log(t);
					break;
				}
			}
			for (BufferedOutputStream b : bout) {
				try {
					b.close();
				} catch (IOException t) {
					JAXBCorePlugin.log(t);
				}
			}
			// input stream closed elsewhere
		}
	}

	private final String uuid;
	private final CommandType command;
	private final IJAXBResourceManagerControl rm;
	private final ICommandJobStreamsProxy proxy;
	private final boolean waitForId;
	private final boolean ignoreExitStatus;
	private final boolean batch;

	private IRemoteProcess process;
	private IStreamParserTokenizer stdoutTokenizer;
	private IStreamParserTokenizer stderrTokenizer;
	private Thread stdoutT;
	private Thread stderrT;
	private InputStream tokenizerOut;
	private InputStream tokenizerErr;
	private StreamSplitter outSplitter;
	private StreamSplitter errSplitter;
	private String remoteOutPath;
	private String remoteErrPath;
	private final StringBuffer error;
	private boolean active;

	/**
	 * @param jobUUID
	 *            either internal or resource specific identifier
	 * @param command
	 *            JAXB data element
	 * @param batch
	 *            whether submission is batch or interactive
	 * @param rm
	 *            the calling resource manager
	 */
	public CommandJob(String jobUUID, CommandType command, boolean batch, IJAXBResourceManagerControl rm) {
		super(command.getName());
		this.command = command;
		this.batch = batch;
		this.rm = rm;
		this.uuid = jobUUID;
		this.proxy = new CommandJobStreamsProxy();
		this.waitForId = command.isWaitForId();
		this.ignoreExitStatus = command.isIgnoreExitStatus();
		this.error = new StringBuffer();
	}

	/**
	 * @return the process wrapper
	 */
	public IRemoteProcess getProcess() {
		return process;
	}

	/**
	 * @return object wrapping stream monitors.
	 */
	public ICommandJobStreamsProxy getProxy() {
		return proxy;
	}

	/**
	 * @return if job is active
	 */
	public boolean isActive() {
		boolean b = false;
		synchronized (this) {
			b = active;
		}
		return b;
	}

	/**
	 * @return if job is batch
	 */
	public boolean isBatch() {
		return batch;
	}

	/**
	 * Used by stream proxy to read stderr from file if submission is batch.
	 * 
	 * @param remoteErrPath
	 *            for stream redirection (batch submissions)
	 */
	public void setRemoteErrPath(String remoteErrPath) {
		this.remoteErrPath = remoteErrPath;
	}

	/**
	 * Used by stream proxy to read stdout from file if submission is batch.
	 * 
	 * @param remoteOutPath
	 */
	public void setRemoteOutPath(String remoteOutPath) {
		this.remoteOutPath = remoteOutPath;
	}

	/**
	 * The resource manager should wait for the job id on the stream (parsed by
	 * an apposite tokenizer) before returning the status object to the caller.
	 * 
	 * @return whether to wait
	 */
	public boolean waitForId() {
		return waitForId;
	}

	/**
	 * Uses the IRemoteProcessBuilder to set up the command and environment.
	 * After start, the tokenizers (if any) are handled, and stream redirection
	 * managed. Waits for the process, then joins on the consumers.<br>
	 * <br>
	 * Note: the resource manager does not join on this thread, but retrieves
	 * the status object from the job, potentially while it is still running, in
	 * order to hand it off to the caller for stream processing.
	 */
	@Override
	protected IStatus run(IProgressMonitor monitor) {
		try {
			synchronized (this) {
				active = false;
			}
			IRemoteProcessBuilder builder = prepareCommand();
			prepareEnv(builder);

			process = null;
			try {
				process = builder.start();
			} catch (IOException t) {
				throw CoreExceptionUtils.newException(Messages.CouldNotLaunch + builder.command(), t);
			}

			maybeInitializeTokenizers(builder);
			setOutStreamRedirection(process);
			setErrStreamRedirection(process);
			startConsumers(process);

			synchronized (this) {
				active = true;
			}

			int exit = 0;
			try {
				exit = process.waitFor();
			} catch (InterruptedException ignored) {
			}

			if (exit != 0 && !ignoreExitStatus) {
				String t = error.toString();
				error.setLength(0);
				throw CoreExceptionUtils.newException(Messages.ProcessExitValueError + (ZEROSTR + exit) + SP + CO + t, null);
			}

			joinConsumers();
		} catch (CoreException ce) {
			return ce.getStatus();
		} catch (Throwable t) {
			return CoreExceptionUtils.getErrorStatus(Messages.ProcessRunError, t);
		}
		synchronized (this) {
			active = false;
		}
		return Status.OK_STATUS;
	}

	private void errorStreamReader(final InputStream err) {
		new Thread() {
			@Override
			public void run() {
				BufferedReader br = new BufferedReader(new InputStreamReader(err));
				while (true) {
					try {
						String line = br.readLine();
						if (line == null) {
							break;
						}
						error.append(line).append(LINE_SEP);
					} catch (EOFException eof) {
						break;
					} catch (IOException io) {
						JAXBCorePlugin.log(io);
						break;
					}
				}
			}
		}.start();
	}

	/**
	 * Wait for any stream consumer threads to exit.
	 * 
	 * @throws CoreException
	 */
	private void joinConsumers() throws CoreException {
		Throwable t = null;

		if (outSplitter != null) {
			try {
				outSplitter.join();
			} catch (InterruptedException ignored) {
			}
		}

		if (errSplitter != null) {
			try {
				errSplitter.join();
			} catch (InterruptedException ignored) {
			}
		}

		if (stdoutT != null) {
			try {
				stdoutT.join();
			} catch (InterruptedException ignored) {
			}
			t = stdoutTokenizer.getInternalError();
		}

		if (stderrT != null) {
			try {
				stderrT.join();
			} catch (InterruptedException ignored) {
			}
			t = stderrTokenizer.getInternalError();
		}

		if (t != null) {
			throw CoreExceptionUtils.newException(Messages.ParserInternalError, t);
		}
	}

	/**
	 * Checks to see what tokenizers are configured for this resource manager.
	 * If the two streams have been joined, it will prefer the redirect parser
	 * if it exists; otherwise the joined streams will be parsed by the stdout
	 * parser.<br>
	 * <br>
	 * If there is a custom extension tokenizer, it will be instantiated here.
	 * 
	 * @param builder
	 * @throws CoreException
	 */
	private void maybeInitializeTokenizers(IRemoteProcessBuilder builder) throws CoreException {
		TokenizerType t = null;

		if (builder.redirectErrorStream()) {
			t = command.getRedirectParser();
		}

		if (t == null) {
			t = command.getStdoutParser();
		}

		if (t != null) {
			try {
				String type = t.getType();
				if (type != null) {
					stdoutTokenizer = getTokenizer(type);
				} else {
					stdoutTokenizer = new ConfigurableRegexTokenizer(uuid, t);
				}
			} catch (Throwable e) {
				throw CoreExceptionUtils.newException(Messages.StdoutParserError, e);
			}
		}

		t = command.getStderrParser();
		if (t != null) {
			try {
				String type = t.getType();
				if (type != null) {
					stderrTokenizer = getTokenizer(type);
				} else {
					stderrTokenizer = new ConfigurableRegexTokenizer(uuid, t);
				}
			} catch (Throwable e) {
				throw CoreExceptionUtils.newException(Messages.StdoutParserError, e);
			}
		}
	}

	/**
	 * Resolves the command arguments against the current environment, then gets
	 * the process builder from the remote connection.
	 * 
	 * @return the process builder
	 * @throws CoreException
	 */
	private IRemoteProcessBuilder prepareCommand() throws CoreException {
		List<ArgType> args = command.getArg();
		if (args == null) {
			throw CoreExceptionUtils.newException(Messages.MissingArglistFromCommandError, null);
		}
		RMVariableMap map = RMVariableMap.getActiveInstance();
		String[] cmdArgs = ArgImpl.getArgs(uuid, args, map);
		RemoteServicesDelegate delegate = rm.getRemoteServicesDelegate();
		return delegate.getRemoteServices().getProcessBuilder(delegate.getRemoteConnection(), cmdArgs);
	}

	/**
	 * Either appends to or replaces the process builder's environment with the
	 * Launch Configuration environment variables. Also sets redirectErrorStream
	 * on the builder.
	 * 
	 * @param builder
	 * @throws CoreException
	 */
	private void prepareEnv(IRemoteProcessBuilder builder) throws CoreException {
		if (!rm.getAppendEnv()) {
			builder.environment().clear();
			Map<String, String> live = rm.getLaunchEnv();
			for (String var : live.keySet()) {
				builder.environment().put(var, live.get(var));
			}
		} else {
			if (command.isReplaceEnvironment()) {
				builder.environment().clear();
			}
			/*
			 * first static env, then dynamic
			 */
			List<NameValuePairType> vars = command.getEnvironment();
			RMVariableMap map = RMVariableMap.getActiveInstance();
			for (NameValuePairType var : vars) {
				EnvironmentVariableUtils.addVariable(uuid, var, builder.environment(), map);
			}

			Map<String, String> live = rm.getLaunchEnv();
			for (String var : live.keySet()) {
				builder.environment().put(var, live.get(var));
			}
		}

		builder.redirectErrorStream(command.isRedirectStderr());
	}

	/**
	 * Configures handling of the error stream. If there is a tokenizer, it
	 * first checks to see if there will be redirection from a remote file, and
	 * if not, splits the stream between the proxy and the tokenizer in the case
	 * of an interactive job; if there is a remote file, that stream is given to
	 * the proxy and the tokenizer gets the stderr of the submission process. If
	 * there is no tokenizer, then the proxy gets either stderr of the
	 * submission process if interactive, or redirection from the remote file,
	 * accordingly.
	 * 
	 * @param process
	 * @throws IOException
	 */
	private void setErrStreamRedirection(IRemoteProcess process) throws IOException {
		if (stderrTokenizer != null) {
			if (remoteErrPath != null) {
				tokenizerErr = process.getErrorStream();
				proxy.setErrMonitor(new CommandJobStreamTailFMonitor(rm, remoteErrPath));
			} else if (!batch) {
				PipedInputStream tokenizerErr = new PipedInputStream();
				this.tokenizerErr = tokenizerErr;
				PipedInputStream monitorErr = new PipedInputStream();
				errSplitter = new StreamSplitter(process.getErrorStream(), tokenizerErr, monitorErr);
				proxy.setErrMonitor(new CommandJobStreamMonitor(monitorErr));
			} else {
				tokenizerErr = process.getErrorStream();
			}
		} else if (remoteErrPath != null) {
			proxy.setErrMonitor(new CommandJobStreamTailFMonitor(rm, remoteErrPath));
			/*
			 * grab error stream for error reporting
			 */
			errorStreamReader(process.getErrorStream());
		} else if (!batch) {
			proxy.setErrMonitor(new CommandJobStreamMonitor(process.getErrorStream()));
		}
	}

	/**
	 * Configures handling of the stdout stream. If there is a tokenizer, it
	 * first checks to see if there will be redirection from a remote file, and
	 * if not, splits the stream between the proxy and the tokenizer in the case
	 * of an interactive job; if there is a remote file, that stream is given to
	 * the proxy and the tokenizer gets the stdout of the submission process. If
	 * there is no tokenizer, then the proxy gets either stdout of the
	 * submission process if interactive, or redirection from the remote file,
	 * accordingly.
	 * 
	 * @param process
	 * @throws IOException
	 */
	private void setOutStreamRedirection(IRemoteProcess process) throws IOException {
		if (stdoutTokenizer != null) {
			if (remoteOutPath != null) {
				tokenizerOut = process.getInputStream();
				proxy.setOutMonitor(new CommandJobStreamTailFMonitor(rm, remoteOutPath));
			} else if (!batch) {
				PipedInputStream tokenizerOut = new PipedInputStream();
				this.tokenizerOut = tokenizerOut;
				PipedInputStream monitorOut = new PipedInputStream();
				outSplitter = new StreamSplitter(process.getInputStream(), tokenizerOut, monitorOut);
				proxy.setOutMonitor(new CommandJobStreamMonitor(monitorOut));
			} else {
				tokenizerOut = process.getInputStream();
			}
		} else if (remoteOutPath != null) {
			proxy.setOutMonitor(new CommandJobStreamTailFMonitor(rm, remoteOutPath));
			errorStreamReader(process.getInputStream());
		} else if (!batch) {
			proxy.setOutMonitor(new CommandJobStreamMonitor(process.getInputStream()));
		}
	}

	/**
	 * Initiates stream reading on all consumers.
	 * 
	 * @param process
	 * @throws CoreException
	 */
	private void startConsumers(IRemoteProcess process) throws CoreException {
		if (outSplitter != null) {
			outSplitter.start();
		}

		if (errSplitter != null) {
			errSplitter.start();
		}

		if (remoteOutPath == null) {
			proxy.startMonitors();
		}

		if (stdoutTokenizer != null) {
			try {
				stdoutTokenizer.setInputStream(tokenizerOut);
				stdoutT = new Thread(stdoutTokenizer);
				stdoutT.start();
			} catch (Throwable e) {
				throw CoreExceptionUtils.newException(Messages.StdoutParserError, e);
			}
		}
		if (stderrTokenizer != null) {
			try {
				stderrTokenizer.setInputStream(tokenizerErr);
				stderrT = new Thread(stderrTokenizer);
				stderrT.start();
			} catch (Throwable e) {
				throw CoreExceptionUtils.newException(Messages.StderrParserError, e);
			}
		}
	}

	/**
	 * Extension-based instantiation for custom tokenizer.
	 * 
	 * @param type
	 *            extension name
	 * @return the tokenizer instance
	 * @throws CoreException
	 */
	public static IStreamParserTokenizer getTokenizer(String type) throws CoreException {
		IExtensionPoint extensionPoint = Platform.getExtensionRegistry().getExtensionPoint(JAXBCorePlugin.PLUGIN_ID,
				TOKENIZER_EXT_PT);
		IConfigurationElement[] elements = extensionPoint.getConfigurationElements();
		for (int i = 0; i < elements.length; i++) {
			IConfigurationElement element = elements[i];
			try {
				if (element.getAttribute(ID).equals(type)) {
					return (IStreamParserTokenizer) element.createExecutableExtension(CLASS);
				}
			} catch (CoreException ce) {
				throw ce;
			} catch (Throwable t) {
				throw CoreExceptionUtils.newException(Messages.StreamTokenizerInstantiationError + type, t);
			}
		}
		return null;
	}
}
