blob: 02774c500202045c6b9d638ef49ff091cd369fe6 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2009, 2019 Xored Software Inc and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-v20.html
*
* Contributors:
* Xored Software Inc - initial API and implementation and/or initial documentation
*******************************************************************************/
package org.eclipse.rcptt.ecl.client.tcp;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.emf.ecore.util.EcoreUtil;
import org.eclipse.rcptt.ecl.core.Command;
import org.eclipse.rcptt.ecl.internal.core.CorePlugin;
import org.eclipse.rcptt.ecl.internal.core.IMarkeredPipe;
import org.eclipse.rcptt.ecl.internal.core.Pipe;
import org.eclipse.rcptt.ecl.internal.core.Process;
import org.eclipse.rcptt.ecl.runtime.CoreUtils;
import org.eclipse.rcptt.ecl.runtime.IPipe;
import org.eclipse.rcptt.ecl.runtime.IProcess;
import org.eclipse.rcptt.ecl.runtime.ISession;
public class EclTcpSession implements ISession {
private static final ExecutionNode CLOSE_NODE = new ExecutionNode();
private Map<String, Object> properties = new HashMap<String, Object>();
private final AtomicBoolean closed = new AtomicBoolean(false);
@SuppressWarnings("unused")
private InetAddress address;
@SuppressWarnings("unused")
private int port;
private Socket socket;
private String sessionID;
private static class ExecutionNode {
Command command;
IPipe input;
IPipe output;
Process process;
}
private BlockingQueue<ExecutionNode> commands = new LinkedBlockingQueue<ExecutionNode>(
10);
private Thread processingThread;
public EclTcpSession(InetAddress address, int port) throws IOException {
NetworkUtil.initTimeouts();
this.address = address;
this.port = port;
try {
initSocket(address, port, true);
} catch (IOException e) {
initSocket(address, port, false);
EclTcpClientPlugin
.logInfo("Could not open a session with NO_DELAY and SO_REUSEADDR, succeeded with default socket");
}
processingThread = new Thread(new Runnable() {
public void run() {
try {
while (!closed.get()) {
ExecutionNode node = null;
IMarkeredPipe pipe = null;
try {
node = commands.take();
if (CLOSE_NODE.equals(node)) {
return;
}
pipe = CoreUtils.createEMFPipe(
socket.getInputStream(),
socket.getOutputStream());
pipe.write(node.command);
readInput(node.input, pipe);
pipe.writeCloseMarker();
IStatus result = writeOutput(node.output, pipe);
node.process.setStatus(result);
} catch (Throwable t) {
try {
if (node != null) {
IStatus status = (t instanceof CoreException)
? ((CoreException) t).getStatus()
: CorePlugin.err(t);
node.process.setStatus(new EclTcpSocketStatus(
status));
}
} catch (CoreException e1) {
CorePlugin.log(e1);
}
} finally {
if (pipe != null) {
pipe.closeNoWait();
}
}
}
} finally {
try {
closeSocket();
} catch (Throwable e) {
CorePlugin.log(e);
}
}
}
}, "ECL TCP session execute: " + sessionID);
processingThread.start();
}
private void initSocket(InetAddress address, int port, boolean nonDefaultSocket) throws SocketException,
IOException {
socket = new Socket();
if (nonDefaultSocket) {
socket.setReuseAddress(true);
try {
this.socket.setTcpNoDelay(true);
} catch (SocketException e) {
EclTcpClientPlugin.log(new Status(IStatus.ERROR, EclTcpClientPlugin.PLUGIN_ID,
"Error setting TCP_NODELAY on client socket"));
}
}
socket.connect(new InetSocketAddress(address, port));
initSessionId(socket);
}
private void initSessionId(Socket socket) throws IOException {
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
DataOutputStream dout = new DataOutputStream(outputStream);
DataInputStream din = new DataInputStream(inputStream);
dout.writeUTF("newsession");
dout.flush();
sessionID = din.readUTF();
}
public IPipe createPipe() {
return new Pipe();
}
public IProcess execute(final Command command) throws CoreException {
return execute(command, null, null);
}
public IProcess execute(final Command command, IPipe in, IPipe out)
throws CoreException {
ExecutionNode node = new ExecutionNode();
node.command = (Command) EcoreUtil.copy(command);
node.input = in == null ? createPipe().close(Status.OK_STATUS) : in;
node.output = out == null ? createPipe() : out;
node.process = new Process(this, node.input, node.output);
try {
commands.put(node);
} catch (InterruptedException e) {
throw new CoreException(new Status(Status.ERROR,
EclTcpClientPlugin.PLUGIN_ID,
"Failed to execute ecl command: "
+ command.getClass().getName(), e));
}
return node.process;
}
private void readInput(final IPipe input, IPipe pipe) throws CoreException {
while (true) {
Object object = input.take(Long.MAX_VALUE);
if (object instanceof IStatus) {
pipe.write(object);
break;
}
pipe.write(object);
}
}
private IStatus writeOutput(final IPipe output, IPipe pipe)
throws CoreException {
while (true) {
Object object = pipe.take(Long.MAX_VALUE);
if (object instanceof IStatus) {
return (IStatus) object;
} else {
output.write(object);
}
}
}
public void close() throws CoreException {
closed.compareAndSet(false, true);
try {
if (!Thread.currentThread().isInterrupted()) {
commands.put(CLOSE_NODE);
} else {
processingThread.interrupt();
}
closeSocket();
} catch (Throwable e) {
CorePlugin.log(e);
}
}
public boolean isClosed() {
return closed.get();
}
private synchronized void closeSocket() throws IOException {
if (socket != null) {
socket.close();
socket = null;
}
}
public synchronized void putProperty(String key, Object value) {
if (value == null) {
properties.remove(key);
} else {
properties.put(key, value);
}
}
public synchronized Object getProperty(String key) {
return properties.get(key);
}
}