| /****************************************************************************** |
| * Copyright (c) 2009 Remy Chi Jian Suen and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Remy Chi Jian Suen - initial API and implementation |
| ******************************************************************************/ |
| package org.eclipse.ecf.provider.datashare.nio; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.net.ServerSocket; |
| import java.net.SocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.eclipse.core.runtime.Assert; |
| import org.eclipse.core.runtime.ISafeRunnable; |
| import org.eclipse.core.runtime.IStatus; |
| import org.eclipse.core.runtime.SafeRunner; |
| import org.eclipse.core.runtime.Status; |
| import org.eclipse.ecf.core.identity.ID; |
| import org.eclipse.ecf.core.util.ECFException; |
| import org.eclipse.ecf.datashare.IChannel; |
| import org.eclipse.ecf.datashare.IChannelListener; |
| import org.eclipse.ecf.datashare.events.IChannelConnectEvent; |
| import org.eclipse.ecf.datashare.events.IChannelDisconnectEvent; |
| import org.eclipse.ecf.datashare.events.IChannelEvent; |
| import org.eclipse.ecf.datashare.events.IChannelMessageEvent; |
| |
| /** |
| * An abstract implementation of <code>IChannel</code> that uses Java 1.4 NIO |
| * APIs for sending and retrieving data. |
| * <p> |
| * This channel will inherently spawn multiple socket connections as messages |
| * are sent to different remote clients via {@link #sendMessage(ID, byte[])}. |
| * Please note that the current implementation does not handle repeated |
| * invocations to that method well. Please refer to its javadoc for further |
| * information. |
| * </p> |
| * <p> |
| * Subclasses must implement the following: |
| * <ul> |
| * <li>For communicating local information for establishing a socket connection: |
| * <ul> |
| * <li>{@link #sendRequest(ID)}</li> |
| * </ul> |
| * </li> |
| * <li>To facilitate the logging of statuses: |
| * <ul> |
| * <li>{@link #log(IStatus)}</li> |
| * </ul> |
| * </li> |
| * </ul> |
| * </p> |
| * <p> |
| * <b>Note:</b> This class/interface is part of an interim API that is still |
| * under development and expected to change significantly before reaching |
| * stability. It is being made available at this early stage to solicit feedback |
| * from pioneering adopters on the understanding that any code that uses this |
| * API will almost certainly be broken (repeatedly) as the API evolves. |
| * </p> |
| */ |
| public abstract class NIOChannel implements IChannel { |
| |
| private NIODatashareContainer datashareContainer; |
| |
| /** |
| * The id of the originating owner container of the datashare container that |
| * created this channel. |
| */ |
| private final ID containerId; |
| |
| /** |
| * The ID of this channel. |
| */ |
| private final ID id; |
| |
| /** |
| * The server socket for listening to incoming connections. This channel is |
| * non-blocking. |
| */ |
| private ServerSocketChannel serverSocketChannel; |
| |
| /** |
| * The port that the server socket is listening on for incoming connections. |
| * |
| * @see #serverSocketChannel |
| * @see #getLocalPort() |
| */ |
| private final int localPort; |
| |
| /** |
| * A map of <code>ID</code>s to their corresponding |
| * <code>SocketChannel</code>s. |
| */ |
| private Map connectedSockets; |
| |
| /** |
| * A list of sockets that is waiting to handshake with remote peers. |
| */ |
| private List pendingSockets; |
| |
| /** |
| * A queue of messages that needs to be sent to remote clients. |
| */ |
| private LinkedList messages; |
| |
| /** |
| * This channel's listener. May be <code>null</code>. |
| */ |
| private IChannelListener listener; |
| |
| /** |
| * The thread responsible for processing incoming messages and sending |
| * messages to remote peers. |
| */ |
| private Thread processingThread; |
| |
| /** |
| * Instantiates a new channel for sending and receiving messages in a |
| * non-blocking manner via sockets. |
| * |
| * @param datashareContainer |
| * the source NIODatashareContainer that created this channel, |
| * cannot be <code>null</code> |
| * @param containerId |
| * the id of the originating owner container, this should |
| * <b>not</b> be the id of the datashare container that created |
| * this channel but the parent container of the datashare |
| * container, may not be <code>null</code> |
| * @param id |
| * the id of this channel, may not be <code>null</code> |
| * @param listener |
| * the channel listener for this channel, may be |
| * <code>null</code> if no notification is required |
| * @throws ECFException |
| * if an error occurred while creating this channel |
| */ |
| public NIOChannel(NIODatashareContainer datashareContainer, ID containerId, |
| ID id, IChannelListener listener) throws ECFException { |
| Assert.isNotNull(datashareContainer, |
| "Datashare container cannot be null"); //$NON-NLS-1$ |
| Assert.isNotNull(containerId, "Container id cannot be null"); //$NON-NLS-1$ |
| Assert.isNotNull(id, "Channel id cannot be null"); //$NON-NLS-1$ |
| |
| this.datashareContainer = datashareContainer; |
| this.containerId = containerId; |
| this.id = id; |
| this.listener = listener; |
| |
| try { |
| // open a server socket |
| serverSocketChannel = ServerSocketChannel.open(); |
| serverSocketChannel.configureBlocking(false); |
| } catch (IOException e) { |
| throw new ECFException(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Could not create server socket", e)); //$NON-NLS-1$ |
| } |
| |
| try { |
| // bind to a local port |
| ServerSocket socket = serverSocketChannel.socket(); |
| socket.bind(getBindAddress(), getBackLog()); |
| } catch (IOException e) { |
| throw new ECFException(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Could not bind server socket", e)); //$NON-NLS-1$ |
| } |
| |
| localPort = serverSocketChannel.socket().getLocalPort(); |
| |
| connectedSockets = new HashMap(); |
| pendingSockets = new ArrayList(); |
| messages = new LinkedList(); |
| |
| processingThread = new Thread(new ProcessingRunnable(), getClass() |
| .getName() |
| + "Thread-" + id.toString()); //$NON-NLS-1$ |
| processingThread.start(); |
| } |
| |
| /** |
| * Fires a channel connected event to this channel's listener if there is |
| * one attached. |
| * |
| * @param containerId |
| * the target ID of the container has connected to |
| */ |
| void fireChannelConnectEvent(final ID containerId) { |
| IChannelListener listener = getListener(); |
| if (listener != null) { |
| fireChannelEvent(listener, new IChannelConnectEvent() { |
| public ID getChannelID() { |
| return id; |
| } |
| |
| public ID getTargetID() { |
| return containerId; |
| } |
| |
| public String toString() { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append("IChannelConnectEvent["); //$NON-NLS-1$ |
| buffer.append("channel=").append(id); //$NON-NLS-1$ |
| buffer.append(",target=").append(containerId).append(']'); //$NON-NLS-1$ |
| return buffer.toString(); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Fires a channel disconnected event to this channel's listener if there is |
| * one attached. |
| * |
| * @param containerId |
| * the target ID of the container has disconnected from |
| */ |
| void fireChannelDisconnectEvent(final ID containerId) { |
| IChannelListener listener = getListener(); |
| if (listener != null) { |
| fireChannelEvent(listener, new IChannelDisconnectEvent() { |
| public ID getChannelID() { |
| return id; |
| } |
| |
| public ID getTargetID() { |
| return containerId; |
| } |
| |
| public String toString() { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append("IChannelDisconnectEvent["); //$NON-NLS-1$ |
| buffer.append("channel=").append(id); //$NON-NLS-1$ |
| buffer.append(",target=").append(containerId).append(']'); //$NON-NLS-1$ |
| return buffer.toString(); |
| } |
| }); |
| } |
| } |
| |
| protected abstract void log(IStatus status); |
| |
| /** |
| * Returns the address that this channel's server socket should bind to. If |
| * <code>null</code>, a default port and valid local address will be used. |
| * |
| * @return this channel's server socket's bind address, may be |
| * <code>null</code> if a default should be used |
| */ |
| protected SocketAddress getBindAddress() { |
| return null; |
| } |
| |
| /** |
| * Retrieves the listen backlog length of this channel's server socket. If |
| * the value is less than or equal to zero, the default length is used. |
| * |
| * @return this channel's server socket's listen backlog length |
| */ |
| protected int getBackLog() { |
| return 0; |
| } |
| |
| /** |
| * Sends any pending messages we may have queued up. |
| */ |
| private void sendPendingMessages() { |
| Collection deadSockets = null; |
| Collection processedMessages = null; |
| |
| for (Iterator it = messages.iterator(); it.hasNext();) { |
| ChannelMessage message = (ChannelMessage) it.next(); |
| ID id = message.getId(); |
| SocketChannel channel = (SocketChannel) connectedSockets.get(id); |
| // check if we have a socket for the target of this message |
| if (channel != null) { |
| byte[] data = message.getData(); |
| |
| try { |
| // flush the data directly with regular IO, this method |
| // saves us the extra work of having to constantly flip and |
| // clear a ByteBuffer and in a way ensures the message is |
| // sent in one piece instead of chunks |
| channel.configureBlocking(true); |
| channel.socket().getOutputStream().write(data); |
| channel.socket().getOutputStream().flush(); |
| // turn off blocking |
| channel.configureBlocking(false); |
| } catch (IOException e) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Error occurred while sending message", e)); //$NON-NLS-1$ |
| if (deadSockets == null) { |
| deadSockets = new HashSet(); |
| } |
| deadSockets.add(id); |
| } |
| |
| if (processedMessages == null) { |
| processedMessages = new LinkedList(); |
| } |
| // store the processed message |
| processedMessages.add(message); |
| } |
| } |
| |
| // remove all messages that have been processed |
| if (processedMessages != null) { |
| messages.removeAll(processedMessages); |
| } |
| |
| if (deadSockets != null) { |
| for (Iterator it = deadSockets.iterator(); it.hasNext();) { |
| ID id = (ID) it.next(); |
| SocketChannel channel = (SocketChannel) connectedSockets |
| .remove(id); |
| Util.closeChannel(channel); |
| } |
| } |
| } |
| |
| /** |
| * Reads in any incoming messages from remote clients. |
| * |
| * @param buffer |
| * the buffer to use for reading the socket |
| * @throws IOException |
| * if an error occurs while reading from the socket |
| */ |
| private void processIncomingMessages(ByteBuffer buffer) throws IOException { |
| Collection deadSockets = null; |
| |
| for (Iterator it = connectedSockets.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| SocketChannel socketChannel = (SocketChannel) entry.getValue(); |
| |
| try { |
| if (!processIncomingMessages(socketChannel, buffer)) { |
| if (deadSockets == null) { |
| deadSockets = new HashSet(); |
| } |
| deadSockets.add(entry.getKey()); |
| } |
| } catch (IOException e) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Error occurred while sending message", e)); //$NON-NLS-1$ |
| if (deadSockets == null) { |
| deadSockets = new HashSet(); |
| } |
| deadSockets.add(entry.getKey()); |
| } |
| } |
| |
| if (deadSockets != null) { |
| for (Iterator it = deadSockets.iterator(); it.hasNext();) { |
| ID id = (ID) it.next(); |
| SocketChannel channel = (SocketChannel) connectedSockets |
| .remove(id); |
| Util.closeChannel(channel); |
| } |
| } |
| } |
| |
| /** |
| * Processes any incoming messages from the specified channel by reading it |
| * into the specified buffer and returns whether the channel has reached |
| * end-of-stream. |
| * |
| * @param socketChannel |
| * the channel to read messages from |
| * @param buffer |
| * the buffer to use to read the messages into |
| * @return <code>true</code> if the channel is still active, |
| * <code>false</code> has reached end-of-stream |
| * @throws IOException |
| * if an error occurred while trying to read from the channel |
| */ |
| private boolean processIncomingMessages(SocketChannel socketChannel, |
| ByteBuffer buffer) throws IOException { |
| ChannelData channelData = Util.read(socketChannel, buffer); |
| byte[] message = channelData.getData(); |
| |
| if (message != null) { |
| processIncomingMessage(socketChannel, message); |
| } |
| return channelData.isOpen(); |
| } |
| |
| /** |
| * Processes the message that has been received from the specified channel. |
| * |
| * @param socketChannel |
| * the channel that the message was from |
| * @param message |
| * the message that was received |
| */ |
| void processIncomingMessage(SocketChannel socketChannel, byte[] message) { |
| // we read something, need to notify |
| IChannelListener listener = getListener(); |
| if (listener != null) { |
| // we have a listener, convert our data and then notify |
| byte[][] messages = convert(message); |
| if (messages != null) { |
| fireMessageEvents(listener, socketChannel, messages); |
| } |
| } |
| } |
| |
| /** |
| * Converts the data that has been read from the socket into separate byte[] |
| * instances. |
| * |
| * @param message |
| * the data read from the socket |
| * @return a byte[][] containing the individual messages |
| */ |
| private byte[][] convert(byte[] message) { |
| try { |
| // back the read in data with a ByteArrayInputStream |
| ByteArrayInputStream bais = new ByteArrayInputStream(message); |
| // instantiate an ObjectInputStream and read the individual |
| // byte[] |
| byte[] bytes = (byte[]) new ObjectInputStream(bais).readObject(); |
| |
| if (bais.available() == 0) { |
| return new byte[][] { bytes }; |
| } |
| |
| Collection c = new ArrayList(); |
| c.add(bytes); |
| |
| while (bais.available() != 0) { |
| // instantiate an ObjectInputStream and read the individual |
| // byte[] |
| bytes = (byte[]) new ObjectInputStream(bais).readObject(); |
| // store it |
| c.add(bytes); |
| } |
| |
| // return all the individual byte[]s |
| return (byte[][]) c.toArray(new byte[c.size()][]); |
| } catch (IOException e) { |
| return null; |
| } catch (ClassNotFoundException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * Fires message events to the specified listener for each of the message |
| * that was received. |
| * |
| * @param listener |
| * the listener to notify |
| * @param socketChannel |
| * the socket that the message was read from |
| * @param messages |
| * the messages that have been received |
| */ |
| private void fireMessageEvents(IChannelListener listener, |
| SocketChannel socketChannel, byte[][] messages) { |
| for (int i = 0; i < messages.length; i++) { |
| IChannelEvent event = createMessageEvent(socketChannel, messages[i]); |
| if (event != null) { |
| fireChannelEvent(listener, event); |
| } |
| } |
| } |
| |
| /** |
| * Notifies the specified listener of the given channel event. The code is |
| * run within a SafeRunner to ensure that the program flow is not affected |
| * in the event of errors during notification. |
| * |
| * @param listener |
| * the listener to notify |
| * @param event |
| * the event to fire |
| */ |
| private void fireChannelEvent(final IChannelListener listener, |
| final IChannelEvent event) { |
| // use a SafeRunner to send out the notification to ensure that |
| // client-side failures do not cause the channel to die |
| SafeRunner.run(new ISafeRunnable() { |
| public void run() throws Exception { |
| listener.handleChannelEvent(event); |
| } |
| |
| public void handleException(Throwable t) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Error handling channel event", t)); //$NON-NLS-1$ |
| } |
| }); |
| } |
| |
| /** |
| * Creates and returns a message event corresponding to the specified |
| * channel and the data that was read. |
| * |
| * @param channel |
| * the socket channel that the message was from |
| * @param data |
| * the message from the remote peer |
| * @return a message event describing the received message, may be |
| * <code>null</code> if the channel could not be identified |
| */ |
| private IChannelEvent createMessageEvent(SocketChannel channel, |
| final byte[] data) { |
| // search for the id of the corresponding channel |
| for (Iterator it = connectedSockets.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| if (channel == entry.getValue()) { |
| final ID fromId = (ID) entry.getKey(); |
| |
| return new IChannelMessageEvent() { |
| public byte[] getData() { |
| return data; |
| } |
| |
| public ID getFromContainerID() { |
| return fromId; |
| } |
| |
| public ID getChannelID() { |
| return id; |
| } |
| |
| public String toString() { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append("IChannelMessageEvent["); //$NON-NLS-1$ |
| buffer.append("container=").append(fromId); //$NON-NLS-1$ |
| buffer.append(",channel=").append(id); //$NON-NLS-1$ |
| buffer.append(",data=").append(data).append(']'); //$NON-NLS-1$ |
| return buffer.toString(); |
| } |
| }; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Stores the specified ID with its corresponding socket into this channel. |
| * The socket will now be actively used for reading and sending messages. |
| * |
| * @param id |
| * the target that the socket is connected with |
| * @param socketChannel |
| * the socket channel to be stored |
| */ |
| void put(ID id, SocketChannel socketChannel) { |
| connectedSockets.put(id, socketChannel); |
| } |
| |
| /** |
| * Accept the socket as a potential client and attempt to handshake with it. |
| * |
| * @param socketChannel |
| * the socket to establish a connection with |
| * @throws ClassNotFoundException |
| * if the class of an object being deserialized could not be |
| * found |
| * @throws IOException |
| * if a networking error occurs with the socket while |
| * reading/sending messages |
| */ |
| // private boolean accept2(SocketChannel socketChannel, ByteBuffer buffer) |
| // throws ClassNotFoundException, IOException { |
| // int read = socketChannel.read(buffer); |
| // buffer.flip(); |
| // byte[] bytes = new byte[read]; |
| // buffer.get(bytes); |
| // buffer.clear(); |
| // |
| // ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( |
| // bytes)); |
| // Object object = ois.readObject(); |
| // |
| // if (object instanceof ID) { |
| // bytes = serialize(id); |
| // socketChannel.socket().getOutputStream().write(bytes); |
| // |
| // bytes = serialize(containerId); |
| // socketChannel.socket().getOutputStream().write(bytes); |
| // socketChannel.socket().getOutputStream().flush(); |
| // |
| // socketChannel.configureBlocking(false); |
| // put((ID) object, socketChannel); |
| // } |
| // return true; |
| // } |
| |
| /** |
| * Performs a handshake with a remote peer via the provided socket channel |
| * and returns whether <code>true</code> if no further attempts are |
| * required. Note that <code>true</code> does not indicate that the |
| * handshake has been successful. |
| * <p> |
| * For example, if the remote peer has closed this channel then |
| * <code>true</code> would be returned as no further attempts should be |
| * attempted. |
| * </p> |
| * |
| * @param socketChannel |
| * the channel to use to handshake with the remote peer |
| * @param buffer |
| * the buffer to use for reading and writing data from the |
| * channel |
| * @return <code>true</code> if no further handshake attempts are required, |
| * <code>false</code> otherwise |
| * @throws ClassNotFoundException |
| * if deserialization failed during the handshake |
| * @throws IOException |
| * if an IO error occurred while performing the handshake |
| */ |
| private boolean handshake(SocketChannel socketChannel, ByteBuffer buffer) |
| throws ClassNotFoundException, IOException { |
| ChannelData data = Util.read(socketChannel, buffer); |
| if (!data.isOpen()) { |
| // this channel is dead, close it |
| Util.closeChannel(socketChannel); |
| return true; |
| } |
| |
| byte[] bytes = data.getData(); |
| if (bytes == null) { |
| return false; |
| } |
| |
| ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( |
| bytes)); |
| Object object = ois.readObject(); |
| |
| if (object instanceof ID) { |
| socketChannel.configureBlocking(true); |
| byte[] one = Util.serialize(id); |
| byte[] two = Util.serialize(containerId); |
| bytes = new byte[one.length + two.length]; |
| System.arraycopy(one, 0, bytes, 0, one.length); |
| System.arraycopy(two, 0, bytes, one.length, two.length); |
| |
| socketChannel.socket().getOutputStream().write(bytes); |
| socketChannel.socket().getOutputStream().flush(); |
| |
| socketChannel.configureBlocking(false); |
| put((ID) object, socketChannel); |
| } |
| return true; |
| } |
| |
| /** |
| * Returns the port that is currently open for incoming socket connections. |
| * |
| * @return the open port for socket connections |
| * @see #sendRequest(ID) |
| * @see NIODatashareContainer#enqueue(SocketAddress) |
| */ |
| protected final int getLocalPort() { |
| return localPort; |
| } |
| |
| /** |
| * Sends a request to the receiver to notify them that a socket is open and |
| * waiting for incoming connections to establish a channel connection. It is |
| * up to the client to decide how this request should be sent as the |
| * communication channel between one client and another is entirely |
| * dependent on the underlying provider's networking protocol. |
| * <p> |
| * This method will be invoked when a socket corresponding to the receiver's |
| * ID cannot be found. |
| * </p> |
| * |
| * @param receiver |
| * the receiver to contact, will not be <code>null</code> |
| * @throws ECFException |
| * if an error occurred while attempting to send the request |
| * @see #getLocalPort() |
| * @see NIODatashareContainer#enqueue(SocketAddress) |
| */ |
| protected abstract void sendRequest(ID receiver) throws ECFException; |
| |
| public void sendMessage(byte[] message) throws ECFException { |
| throw new ECFException(new Status(IStatus.ERROR, Util.PLUGIN_ID,"A receiver must be specified, see sendMessage(ID, byte[])")); |
| } |
| |
| /** |
| * Sends a message to a remote instance of this channel of the target peer. |
| * <p> |
| * <b>Note:</b> The current implementation does not handle repeated |
| * invocations of this method in succession prior to a socket connection |
| * established. For optimal performance and some assurance of success, there |
| * needs to be a time lag between the first message that is sent and the |
| * ones that follow it. This lag should hopefully allow the provider |
| * sufficient time for establishing a socket connection with the remote |
| * peer. Otherwise, there may be multiple invocations of |
| * {@link #sendRequest(ID)} and clients are responsible for handling this |
| * individually. |
| * </p> |
| * |
| * @param receiver |
| * the receiver to send the message to, must not be |
| * <code>null</code> |
| * @param message |
| * the message to send, must not be <code>null</code> |
| */ |
| public void sendMessage(ID receiver, byte[] message) throws ECFException { |
| Assert.isNotNull(receiver, "A receiver must be specified"); //$NON-NLS-1$ |
| Assert.isNotNull(message, "Message cannot be null"); //$NON-NLS-1$ |
| |
| // check if we already have a socket for this receiver |
| if (!connectedSockets.containsKey(receiver)) { |
| // send a request to the receiver for establishing a socket |
| // connection |
| sendRequest(receiver); |
| } |
| |
| synchronized (messages) { |
| // enqueue the message for processing |
| messages.add(new ChannelMessage(receiver, message)); |
| } |
| } |
| |
| /** |
| * Disposes of this channel. Clients may extend to perform additional |
| * clean-up but <b>must</b> call <code>super.dispose()</code> before the |
| * method returns. |
| */ |
| public void dispose() { |
| processingThread.interrupt(); |
| |
| try { |
| // turn off the server to prevent and deny incoming connections |
| if (serverSocketChannel != null) { |
| serverSocketChannel.close(); |
| serverSocketChannel = null; |
| } |
| } catch (IOException e) { |
| // ignored |
| serverSocketChannel = null; |
| } |
| |
| synchronized (connectedSockets) { |
| // close all connections |
| for (Iterator it = connectedSockets.values().iterator(); it |
| .hasNext();) { |
| SocketChannel socket = (SocketChannel) it.next(); |
| Util.closeChannel(socket); |
| } |
| |
| connectedSockets.clear(); |
| } |
| |
| datashareContainer.fireChannelContainerDeactivatedEvent(id); |
| } |
| |
| public IChannelListener getListener() { |
| return listener; |
| } |
| |
| public IChannelListener setListener(IChannelListener listener) { |
| IChannelListener previous = this.listener; |
| this.listener = listener; |
| return previous; |
| } |
| |
| public Object getAdapter(Class adapter) { |
| if (adapter != null && adapter.isInstance(this)) { |
| return this; |
| } |
| return null; |
| } |
| |
| public ID getID() { |
| return id; |
| } |
| |
| private final class ProcessingRunnable implements Runnable { |
| |
| public void run() { |
| ByteBuffer buffer = ByteBuffer.allocate(1024); |
| while (true) { |
| try { |
| Thread.sleep(50); |
| |
| if (Thread.currentThread().isInterrupted()) { |
| return; |
| } |
| |
| // perform handshaking for any pending sockets |
| for (int i = 0; i < pendingSockets.size(); i++) { |
| SocketChannel channel = (SocketChannel) pendingSockets |
| .get(i); |
| if (handshake(channel, buffer)) { |
| // remove if handled |
| pendingSockets.remove(i); |
| i--; |
| } |
| } |
| |
| processIncomingMessages(buffer); |
| |
| // check if we have pending messages to send |
| synchronized (messages) { |
| if (!messages.isEmpty()) { |
| sendPendingMessages(); |
| } |
| } |
| |
| SocketChannel socketChannel = serverSocketChannel.accept(); |
| if (socketChannel != null) { |
| socketChannel.configureBlocking(false); |
| pendingSockets.add(socketChannel); |
| } |
| } catch (InterruptedException e) { |
| Thread.interrupted(); |
| return; |
| } catch (ClassNotFoundException e) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "Could not deserialize", e)); //$NON-NLS-1$ |
| } catch (IOException e) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "An IO error occurred", e)); //$NON-NLS-1$ |
| } catch (RuntimeException e) { |
| log(new Status(IStatus.ERROR, Util.PLUGIN_ID, |
| "A runtime error occurred", e)); //$NON-NLS-1$ |
| } |
| } |
| } |
| } |
| |
| private final class ChannelMessage { |
| |
| private ID fromId; |
| private byte[] data; |
| |
| ChannelMessage(ID fromId, byte[] data) throws ECFException { |
| this.fromId = fromId; |
| this.data = convert(data); |
| } |
| |
| private byte[] convert(byte[] data) throws ECFException { |
| try { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ObjectOutputStream oos = new ObjectOutputStream(baos); |
| oos.writeObject(data); |
| return baos.toByteArray(); |
| } catch (IOException e) { |
| throw new ECFException(e); |
| } |
| } |
| |
| public ID getId() { |
| return fromId; |
| } |
| |
| public byte[] getData() { |
| return data; |
| } |
| |
| } |
| |
| } |