blob: 939b399497e5b2fb3c39bd5c54a90d741006aa01 [file] [log] [blame]
package org.eclipse.stem.graphsynchronizer.impl;
/*******************************************************************************
* Copyright (c) 2011 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
/**
* A messenger that sends and receives messages over tcp/ip. Since it's expected to run
* inside Eclipse it's using Jobs instead of Threads.
*/
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.stem.graphsynchronizer.Messenger;
import org.eclipse.stem.graphsynchronizer.MessengerException;
import org.eclipse.stem.jobs.Activator;
public class NetworkMessenger implements Messenger {
public static String REMOTEHOSTSKEY = "REMOTEHOSTS";
public static String PORTKEY = "PORT";
public static int DEFAULT_PORT = 1211;
private String [] hosts;
private byte [][] recvMessages;
protected InetAddress [] hostAddresses;
private int com_port;
protected static Job acceptJob;
// Used to block until data is received from a host;
protected static CyclicBarrier [] dataReceivedBarriers;
/**
* Initialize the messenger
*/
public void initialize(Properties p) throws MessengerException {
String remoteHosts = p.getProperty(REMOTEHOSTSKEY);
if(remoteHosts == null)
throw new MessengerException("Missing remote hosts specification");
String port = p.getProperty(PORTKEY);
if(port == null) com_port = DEFAULT_PORT;
else com_port = Integer.parseInt(port);
StringTokenizer st = new StringTokenizer(remoteHosts, ";");
hosts = new String[st.countTokens()];
int n=0;
while(st.hasMoreTokens()) hosts[n++] = st.nextToken();
dataReceivedBarriers = new CyclicBarrier[hosts.length];
recvMessages = new byte[hosts.length][];
hostAddresses = new InetAddress[hosts.length];
for(int i=0;i<hosts.length;++i) {
dataReceivedBarriers[i] = new CyclicBarrier(2);
try {
hostAddresses[i] = InetAddress.getByName(hosts[i]);
} catch(UnknownHostException e) {
throw new MessengerException("Unable to resolve host "+hosts[i], e);
}
}
}
public void start() throws MessengerException {
try {
ServerSocket serverSocket = new ServerSocket(com_port);
acceptJob = new SocketListenerJob("Socket listener ",serverSocket);
acceptJob.schedule();
} catch(Exception e) {
throw new MessengerException("Unable to start messenger", e);
}
}
public void stop() throws MessengerException {
wakeupBlockedRecv();
acceptJob.cancel();
}
public void sendMsg(byte [] msg, int n) throws MessengerException {
try {
InetAddress otherHost = hostAddresses[n];
Socket soc = new Socket(otherHost.getHostName(), com_port);
ObjectOutputStream oos = new ObjectOutputStream(soc.getOutputStream());
oos.writeObject(msg);
oos.close();
soc.close();
} catch(Exception e) {
throw new MessengerException("Exception encountered sending message to "+hostAddresses[n], e);
}
}
public byte [] recvMsg(int n) throws MessengerException {
try {
dataReceivedBarriers[n].await();
} catch(Exception e) {
throw new MessengerException("Problem receiving message from host "+n);
}
return recvMessages[n];
}
protected class SocketListenerJob extends Job {
ServerSocket socket;
public SocketListenerJob(String name) {
super(name);
}
public SocketListenerJob(String name, ServerSocket socket) {
super(name);
this.socket = socket;
}
@Override
protected IStatus run(IProgressMonitor monitor) {
try {
while(!socket.isClosed()) {
Socket s = socket.accept();
if(socket.isClosed()) break;
// Which host is connecting?
InetAddress addr = s.getInetAddress();
int h = -1;
for(int i=0;i<hostAddresses.length;++i)
if(addr.equals(hostAddresses[i])) h = i;
if(h == -1)
throw new MessengerException("Unable to determine which host connected, address was "+addr);
new ConnectionHandler("Accepting connection", s, h);
}
} catch(SocketException se) {
// The socket was most likely closed since the user shut down STEM
}
catch(IOException ioe) {
ioe.printStackTrace();
}catch(MessengerException me) {
me.printStackTrace();
}
return Status.OK_STATUS;
}
@Override
protected void canceling() {
try {
socket.close();
this.join();
} catch(Exception e) {
e.printStackTrace();
}
}
}
protected class ConnectionHandler extends Job {
private Socket soc;
private int otherRank;
public ConnectionHandler(String n) {
super(n);
}
public ConnectionHandler(String n, Socket s, int h) {
super(n);
soc = s;
otherRank = h;
this.schedule();
}
@Override
protected IStatus run(IProgressMonitor monitor) {
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(soc.getInputStream());
} catch (IOException e1) {
Activator.logError("Error reading data on socket", e1);
}
try {
byte[] msg = (byte[])ois.readObject();
recvMessages[otherRank] = msg;
ois.close();
soc.close();
} catch(Exception e) {
e.printStackTrace(); // What do we do when an exception happens here?
}
// Wake up anybody waiting for the message
try{
dataReceivedBarriers[otherRank].await();
} catch(BrokenBarrierException bbe) {
Activator.logError("Broken barrier waiting for data", bbe);
} catch(InterruptedException ie) {
Activator.logError("Interrupted waiting for data", ie);
}
return Status.OK_STATUS;
}
}
public void wakeupBlockedRecv() throws MessengerException {
int n=0;
for(CyclicBarrier cb:dataReceivedBarriers) {
if(cb.getNumberWaiting() == 1) {
recvMessages[n] = null; // null means no data or interrupted
try {
cb.await(); // will return righ away and wake up the blocked party
} catch(Exception e) {
throw new MessengerException("Problem waking up blocked receiver", e);
}
}
++n;
}
}
}