blob: e9775228dd46de1db6141a8a631811e8955f31e6 [file] [log] [blame]
/******************************************************************************
* Copyright (c) 2009 Remy Chi Jian Suen and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Remy Chi Jian Suen - initial API and implementation
******************************************************************************/
package org.eclipse.ecf.provider.datashare.nio;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.ISafeRunnable;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.ListenerList;
import org.eclipse.core.runtime.SafeRunner;
import org.eclipse.core.runtime.Status;
import org.eclipse.ecf.core.IContainer;
import org.eclipse.ecf.core.IContainerListener;
import org.eclipse.ecf.core.events.IContainerConnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisconnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisposeEvent;
import org.eclipse.ecf.core.events.IContainerEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.datashare.IChannel;
import org.eclipse.ecf.datashare.IChannelConfig;
import org.eclipse.ecf.datashare.IChannelContainerAdapter;
import org.eclipse.ecf.datashare.IChannelContainerListener;
import org.eclipse.ecf.datashare.IChannelListener;
import org.eclipse.ecf.datashare.events.IChannelContainerChannelActivatedEvent;
import org.eclipse.ecf.datashare.events.IChannelContainerChannelDeactivatedEvent;
import org.eclipse.ecf.datashare.events.IChannelContainerEvent;
/**
* A datashare channel container implementation that creates channels that uses
* NIO for sending and receiving messages.
* <p>
* The channel container facilitates communication at a socket-level to a
* corresponding {@link NIOChannel}. When a request has been received from a
* remote peer to establish a channel connection, the request can be honoured
* via calling the {@link #enqueue(SocketAddress)} method with the remote peer's
* corresponding socket address as the parameter.
* </p>
* <p>
* Subclasses must implement the following:
* <ul>
* <li>For channel creation:
* <ul>
* <li>{@link #createNIOChannel(ID, IChannelListener, Map)}</li>
* </ul>
* <ul>
* <li>{@link #createNIOChannel(IChannelConfig)}</li>
* </ul>
* </li>
* <li>To facilitate the logging of statuses:
* <ul>
* <li>{@link #log(IStatus)}</li>
* </ul>
* </li>
* </ul>
* </p>
* <p>
* <b>Note:</b> This class/interface is part of an interim API that is still
* under development and expected to change significantly before reaching
* stability. It is being made available at this early stage to solicit feedback
* from pioneering adopters on the understanding that any code that uses this
* API will almost certainly be broken (repeatedly) as the API evolves.
* </p>
*/
public abstract class NIODatashareContainer implements IChannelContainerAdapter {
/**
* A thread for establishing a connection to remote clients.
*/
private Thread connectionThread;
/**
* A list of IP addresses that should be connected to.
*/
private LinkedList pendingConnections;
/**
* A list of socket channels that needs to be processed for handshaking with
* the remote peer.
*/
private List pendingSockets;
/**
* A map of datashare channels owned by this container mapped by their ids.
*/
private Map channels;
/**
* The parent container of this datashare container.
*/
private IContainer container;
/**
* List of IChannelContainerListeners attached to this datashare container.
*/
private ListenerList listenerList;
/**
* Instantiates a new datashare container that will connect to remote
* clients using NIO functionality.
*
* @param container
* the parent container of this datashare container, must not be
* <code>null</code>
*/
public NIODatashareContainer(IContainer container) {
Assert.isNotNull(container, "Container cannot be null"); //$NON-NLS-1$
this.container = container;
container.addListener(new IContainerListener() {
public void handleEvent(IContainerEvent event) {
if (event instanceof IContainerConnectedEvent) {
ID id = ((IContainerConnectedEvent) event).getTargetID();
fireChannelConnectedEvent(id);
} else if (event instanceof IContainerDisconnectedEvent) {
ID id = ((IContainerDisconnectedEvent) event).getTargetID();
fireChannelDisconnectedEvent(id);
disconnect();
} else if (event instanceof IContainerDisposeEvent) {
// also invoke disconnect() here in case a disconnection
// event was never fired
disconnect();
}
}
});
channels = new HashMap();
pendingConnections = new LinkedList();
pendingSockets = new ArrayList();
listenerList = new ListenerList();
}
/**
* Fires a channel connected event to all of this channel container's
* channels notifying that the parent container has connected to the
* specified target id.
*
* @param containerTargetId
* the target id that the parent container has connected to
*/
private void fireChannelConnectedEvent(ID containerTargetId) {
synchronized (channels) {
for (Iterator it = channels.values().iterator(); it.hasNext();) {
NIOChannel channel = (NIOChannel) it.next();
channel.fireChannelConnectEvent(containerTargetId);
}
}
}
/**
* Fires a channel disconnected event to all of this channel container's
* channels notifying that the parent container has disconnected from the
* specified target id.
*
* @param containerTargetId
* the target id that the parent container has disconnected from
*/
private void fireChannelDisconnectedEvent(ID id) {
synchronized (channels) {
for (Iterator it = channels.values().iterator(); it.hasNext();) {
NIOChannel channel = (NIOChannel) it.next();
channel.fireChannelDisconnectEvent(id);
}
}
}
/**
* Notifies the specified listener about the event.
*
* @param listener
* the listener to notify
* @param event
* the event to notify the listener of
*/
private void fireChannelContainerEvent(
final IChannelContainerListener listener,
final IChannelContainerEvent event) {
// use a SafeRunner to send out the notification to ensure that
// client-side failures do not cause the channel to die
SafeRunner.run(new ISafeRunnable() {
public void run() throws Exception {
listener.handleChannelContainerEvent(event);
}
public void handleException(Throwable t) {
log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
"Error handling channel container event", t)); //$NON-NLS-1$
}
});
}
private void fireChannelContainerActivatedEvent(final ID channelId) {
Object[] listeners = listenerList.getListeners();
for (int i = 0; i < listeners.length; i++) {
IChannelContainerListener listener = (IChannelContainerListener) listeners[i];
fireChannelContainerEvent(listener,
new IChannelContainerChannelActivatedEvent() {
public ID getChannelID() {
return channelId;
}
public ID getChannelContainerID() {
return container.getID();
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer
.append("IChannelContainerChannelActivatedEvent["); //$NON-NLS-1$
buffer.append("container=").append( //$NON-NLS-1$
container.getID());
buffer.append(",channel=").append(channelId); //$NON-NLS-1$
buffer.append(']');
return buffer.toString();
}
});
}
}
void fireChannelContainerDeactivatedEvent(final ID channelId) {
Object[] listeners = listenerList.getListeners();
for (int i = 0; i < listeners.length; i++) {
IChannelContainerListener listener = (IChannelContainerListener) listeners[i];
fireChannelContainerEvent(listener,
new IChannelContainerChannelDeactivatedEvent() {
public ID getChannelID() {
return channelId;
}
public ID getChannelContainerID() {
return container.getID();
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer
.append("IChannelContainerChannelDeactivatedEvent["); //$NON-NLS-1$
buffer.append("container=").append( //$NON-NLS-1$
container.getID());
buffer.append(",channel=").append(channelId); //$NON-NLS-1$
buffer.append(']');
return buffer.toString();
}
});
}
}
protected abstract void log(IStatus status);
/**
* Stores the specified channel based on the given ID into this container.
*
* @param channel
* the channel to store
*/
private void storeChannel(IChannel channel) {
channels.put(channel.getID(), channel);
}
private void disconnect() {
if (connectionThread != null) {
connectionThread.interrupt();
connectionThread = null;
}
synchronized (pendingConnections) {
pendingConnections.clear();
}
synchronized (pendingSockets) {
for (int i = 0; i < pendingSockets.size(); i++) {
SocketChannel channel = (SocketChannel) pendingSockets.get(i);
Util.closeChannel(channel);
}
pendingSockets.clear();
}
synchronized (channels) {
for (Iterator it = channels.values().iterator(); it.hasNext();) {
final IChannel channel = (IChannel) it.next();
// dispose the channel in a SafeRunner so exceptions don't
// prevent us from disposing other channels
SafeRunner.run(new ISafeRunnable() {
public void run() throws Exception {
channel.dispose();
}
public void handleException(Throwable t) {
log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
"Error disposing channel: " + channel, t)); //$NON-NLS-1$
}
});
}
channels.clear();
}
}
/**
* Attempts to connect to a remote address that has been enqueued to this
* channel container for processing via the {@link #enqueue(SocketAddress)}
* method.
*
* @param buffer
* the buffer to use for reading and writing data
* @throws IOException
* if an IO error occurs while attempting to contact the peer
*/
private void connect(ByteBuffer buffer) throws IOException {
while (!pendingConnections.isEmpty()) {
// retrieve an IP address to connect to
SocketAddress remote = (SocketAddress) pendingConnections
.removeFirst();
// open a socket channel to the remote address
SocketChannel socketChannel = SocketChannel.open(remote);
byte[] bytes = Util.serialize(container.getConnectedID());
if (bytes == null) {
// serialization failed, close the socket
Util.closeChannel(socketChannel);
return;
}
socketChannel.configureBlocking(false);
Util.write(socketChannel, buffer, bytes);
pendingSockets.add(socketChannel);
}
}
/**
* Enqueues the specified address to be connected to. This should be invoked
* after a request has been received from a remote user.
*
* @param address
* the address to connect to, cannot be <code>null</code>
* @see NIOChannel#sendRequest(ID)
*/
public void enqueue(SocketAddress address) {
Assert.isNotNull(address, "Socket address cannot be null"); //$NON-NLS-1$
if (connectionThread == null) {
connectionThread = new Thread(new ConnectionRunnable(), getClass()
.getName()
+ "Thread-" + container.getID().toString()); //$NON-NLS-1$
connectionThread.start();
}
pendingConnections.add(address);
}
/**
* Performs a handshake operation with the remote peer.
*
* @param socketChannel
* the socket channel to handshake with
* @param data
* the data sent from the channel thus far
* @throws ClassNotFoundException
* if a deserialization error occurs
* @throws IOException
* if an IO error occurs while reading or writing data
*/
private void handshake(SocketChannel socketChannel, ChannelData data)
throws ClassNotFoundException, IOException {
// retrieve the data that was sent
byte[] message = data.getData();
// read in the response
ByteArrayInputStream bais = new ByteArrayInputStream(message);
ObjectInputStream ois = new ObjectInputStream(bais);
// first response should be the channel id
ID channelId = (ID) ois.readObject();
synchronized (channels) {
// retrieve the channel that corresponds to that id
IChannel channel = getChannel(channelId);
if (channel == null) {
// can't find a channel that corresponds to the id, close the
// socket
Util.closeChannel(socketChannel);
} else {
// open another ObjectInputStream because each object is
// serialized separately
ois = new ObjectInputStream(bais);
// next id is the id of the remote user
ID peerId = (ID) ois.readObject();
// store the peer id and the corresponding socket in the
// retrieved NIO channel
NIOChannel datashare = (NIOChannel) channel;
datashare.put(peerId, socketChannel);
// check if we have any bytes left to read
int available = bais.available();
if (available != 0) {
// if there are extra bytes that means this is data that
// the sender has sent to us, we must process these messages
byte[] received = new byte[available];
// copy the remaining information
System.arraycopy(message, message.length - available,
received, 0, available);
// process the received data
datashare.processIncomingMessage(socketChannel, received);
}
}
}
}
/**
* Processes any pending sockets that are currently queued up in this
* channel container and is waiting to initiate the handshake process with
* the remote peer.
*
* @param buffer
* the buffer to use for reading and writing data
* @throws ClassNotFoundException
* if a serialization or deserialization operation encountered
* errors
* @throws IOException
* if an IO error occurs while reading or writing data
*/
private void processPendingSockets(ByteBuffer buffer)
throws ClassNotFoundException, IOException {
for (int i = 0; i < pendingSockets.size(); i++) {
SocketChannel socketChannel = (SocketChannel) pendingSockets.get(i);
buffer.clear();
// read in the response
ChannelData data = Util.read(socketChannel, buffer);
if (!data.isOpen()) {
// the channel isn't open, we should close it on our end also
Util.closeChannel(socketChannel);
// remove this channel
pendingSockets.remove(i);
i--;
} else if (data.getData() != null) {
try {
handshake(socketChannel, data);
} finally {
// this socket has been processed, remove it
pendingSockets.remove(i);
i--;
}
}
}
}
/**
* Creates a new NIO-capable channel within this container.
*
* @param channelId
* the ID of the channel, must not be <code>null</code>
* @param listener
* the listener for receiving notifications pertaining to the
* created channel, may be <code>null</code> if no listener needs
* to be notified
* @param properties
* a map of properties to provide to this channel, may be
* <code>null</code>
* @return the created NIOChannel instance
* @throws ECFException
* if an error occurred while creating the channel
* @see #createChannel(ID, IChannelListener, Map)
*/
protected abstract NIOChannel createNIOChannel(ID channelId,
IChannelListener listener, Map properties) throws ECFException;
/**
* Creates a new NIO-capable channel within this container.
*
* @param newChannelConfig
* the configuration for the newly created channel, must not be
* <code>null</code>
* @return the created NIOChannel instance
* @throws ECFException
* if an error occurred while creating the channel
* @see #createChannel(IChannelConfig)
*/
protected abstract NIOChannel createNIOChannel(
IChannelConfig newChannelConfig) throws ECFException;
public final IChannel createChannel(ID channelId,
IChannelListener listener, Map properties) throws ECFException {
Assert.isNotNull(channelId, "Channel id cannot be null"); //$NON-NLS-1$
IChannel channel = createNIOChannel(channelId, listener, properties);
if (channel != null) {
storeChannel(channel);
fireChannelContainerActivatedEvent(channelId);
}
return channel;
}
public final IChannel createChannel(IChannelConfig newChannelConfig)
throws ECFException {
Assert.isNotNull(newChannelConfig, "Channel config cannot be null"); //$NON-NLS-1$
Assert.isNotNull(newChannelConfig.getID(),
"Channel config id cannot be null"); //$NON-NLS-1$
IChannel channel = createNIOChannel(newChannelConfig);
if (channel != null) {
storeChannel(channel);
fireChannelContainerActivatedEvent(newChannelConfig.getID());
}
return channel;
}
public void addListener(IChannelContainerListener listener) {
listenerList.add(listener);
}
public IChannel getChannel(ID channelId) {
Assert.isNotNull(channelId, "Channel id cannot be null"); //$NON-NLS-1$
return (IChannel) channels.get(channelId);
}
public boolean removeChannel(ID channelId) {
IChannel channel = (IChannel) channels.remove(channelId);
if (channel == null) {
return false;
} else {
channel.dispose();
return true;
}
}
public void removeListener(IChannelContainerListener listener) {
listenerList.remove(listener);
}
public Object getAdapter(Class adapter) {
if (adapter == null) {
return null;
} else if (adapter.isInstance(this)) {
return this;
} else if (adapter == IContainer.class) {
return container;
} else {
return null;
}
}
public String toString() {
StringBuffer buffer = new StringBuffer(getClass().getName());
buffer.append("[parentContainer=").append(container).append(']'); //$NON-NLS-1$
return buffer.toString();
}
private class ConnectionRunnable implements Runnable {
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
try {
buffer.clear();
if (Thread.currentThread().isInterrupted()) {
return;
}
synchronized (pendingConnections) {
if (!pendingConnections.isEmpty()) {
connect(buffer);
}
}
processPendingSockets(buffer);
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.interrupted();
return;
} catch (ClassNotFoundException e) {
log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
"Could not deserialize", e)); //$NON-NLS-1$
} catch (IOException e) {
log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
"An IO error occurred", e)); //$NON-NLS-1$
} catch (RuntimeException e) {
log(new Status(IStatus.ERROR, Util.PLUGIN_ID,
"A runtime error occurred", e)); //$NON-NLS-1$
}
}
}
}
}