| /* Copyright (c) 2006-2009 Jan S. Rellermeyer |
| * Systems Group, |
| * Department of Computer Science, ETH Zurich. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * - Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * - Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimer in the |
| * documentation and/or other materials provided with the distribution. |
| * - Neither the name of ETH Zurich nor the names of its contributors may be |
| * used to endorse or promote products derived from this software without |
| * specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| package ch.ethz.iks.r_osgi.impl; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.NotSerializableException; |
| import java.io.OutputStream; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.Dictionary; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Hashtable; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.osgi.framework.Bundle; |
| import org.osgi.framework.BundleException; |
| import org.osgi.framework.Constants; |
| import org.osgi.framework.Filter; |
| import org.osgi.framework.ServiceRegistration; |
| import org.osgi.service.event.Event; |
| import org.osgi.service.event.EventAdmin; |
| import org.osgi.service.event.EventConstants; |
| import org.osgi.service.event.EventHandler; |
| import org.osgi.service.log.LogService; |
| |
| import ch.ethz.iks.r_osgi.AsyncRemoteCallCallback; |
| import ch.ethz.iks.r_osgi.RemoteOSGiException; |
| import ch.ethz.iks.r_osgi.RemoteOSGiService; |
| import ch.ethz.iks.r_osgi.RemoteServiceEvent; |
| import ch.ethz.iks.r_osgi.RemoteServiceReference; |
| import ch.ethz.iks.r_osgi.URI; |
| import ch.ethz.iks.r_osgi.channels.ChannelEndpoint; |
| import ch.ethz.iks.r_osgi.channels.NetworkChannel; |
| import ch.ethz.iks.r_osgi.channels.NetworkChannelFactory; |
| import ch.ethz.iks.r_osgi.messages.DeliverBundlesMessage; |
| import ch.ethz.iks.r_osgi.messages.DeliverServiceMessage; |
| import ch.ethz.iks.r_osgi.messages.LeaseMessage; |
| import ch.ethz.iks.r_osgi.messages.LeaseUpdateMessage; |
| import ch.ethz.iks.r_osgi.messages.RemoteCallMessage; |
| import ch.ethz.iks.r_osgi.messages.RemoteCallResultMessage; |
| import ch.ethz.iks.r_osgi.messages.RemoteEventMessage; |
| import ch.ethz.iks.r_osgi.messages.RemoteOSGiMessage; |
| import ch.ethz.iks.r_osgi.messages.RequestBundleMessage; |
| import ch.ethz.iks.r_osgi.messages.RequestDependenciesMessage; |
| import ch.ethz.iks.r_osgi.messages.RequestServiceMessage; |
| import ch.ethz.iks.r_osgi.messages.StreamRequestMessage; |
| import ch.ethz.iks.r_osgi.messages.StreamResultMessage; |
| import ch.ethz.iks.r_osgi.messages.TimeOffsetMessage; |
| import ch.ethz.iks.r_osgi.streams.InputStreamHandle; |
| import ch.ethz.iks.r_osgi.streams.InputStreamProxy; |
| import ch.ethz.iks.r_osgi.streams.OutputStreamHandle; |
| import ch.ethz.iks.r_osgi.streams.OutputStreamProxy; |
| import ch.ethz.iks.util.CollectionUtils; |
| import ch.ethz.iks.util.StringUtils; |
| |
| /** |
| * <p> |
| * The endpoint of a network channel encapsulates most of the communication |
| * logic like sending of messages, service method invocation, timestamp |
| * synchronization, and event delivery. |
| * </p> |
| * <p> |
| * Endpoints exchange symmetric leases when they are established. These leases |
| * contain the statements of supply and demand. The peer states the services it |
| * offers and the event topics it is interested in. Whenever one of these |
| * statements undergo a change, a lease update has to be sent. Leases expire |
| * with the closing of the network channel and the two endpoints. |
| * </p> |
| * <p> |
| * The network transport of channels is modular and exchangeable. Services can |
| * state the supported protocols in their service uri. R-OSGi maintains a list |
| * of network channel factories and the protocols they support. Each channel |
| * uses exactly one protocol. |
| * <p> |
| * <p> |
| * When the network channel breaks down, the channel endpoint tries to reconnect |
| * and to restore the connection. If this is not possible (for instance, because |
| * the other endpoint is not available any more, the endpoint is unregistered. |
| * </p> |
| * |
| * @author Jan S. Rellermeyer, ETH Zurich |
| */ |
| public final class ChannelEndpointImpl implements ChannelEndpoint { |
| |
| int usageCounter = 1; |
| |
| /** |
| * the channel. |
| */ |
| protected NetworkChannel networkChannel; |
| |
| /** |
| * the services provided by the OSGi framework holding the remote channel |
| * endpoint. Map of service URI -> RemoteServiceReferences |
| */ |
| private Map remoteServices = new HashMap(0); |
| |
| /** |
| * the topics of interest of the OSGi framework holding the remote channel |
| * endpoint. List of topic strings |
| */ |
| private List remoteTopics = new ArrayList(0); |
| |
| /** |
| * the time offset between this peer's local time and the local time of the |
| * remote channel endpoint. |
| */ |
| private TimeOffset timeOffset; |
| |
| /** |
| * Timeout. |
| */ |
| private static final int TIMEOUT = new Integer(System.getProperty( |
| "ch.ethz.iks.r_osgi.channelEndpointImpl.timeout", "120000")) |
| .intValue(); |
| |
| /** |
| * the callback register |
| */ |
| protected final Map callbacks = new HashMap(0); |
| |
| /** |
| * map of service uri -> RemoteServiceRegistration. |
| */ |
| private final HashMap localServices = new HashMap(2); |
| |
| /** |
| * map of service uri -> service registration. |
| */ |
| private final HashMap proxiedServices = new HashMap(0); |
| |
| /** |
| * map of service uri -> proxy bundle. If the endpoint is closed, the |
| * proxies are unregistered. |
| */ |
| protected final HashMap proxyBundles = new HashMap(0); |
| /** |
| * map of stream id -> stream instance. |
| */ |
| private final HashMap streams = new HashMap(0); |
| |
| /** |
| * next stream id. |
| */ |
| private short nextStreamID = 0; |
| |
| /** |
| * the handler registration, if the remote topic space is not empty. |
| */ |
| private ServiceRegistration handlerReg = null; |
| |
| /** |
| * filter for events to prevent loops in the remote delivery if the peers |
| * connected by this channel have non-disjoint topic spaces. |
| */ |
| private static final String NO_LOOPS = "(&(!(" //$NON-NLS-1$ |
| + RemoteEventMessage.EVENT_SENDER_URI + "=*))" //$NON-NLS-1$ |
| // [TCK][r-OSGi] NonSerializableException when running remoteserviceadmin ct |
| // https://bugs.eclipse.org/418740 |
| + "(!(" + EventConstants.EVENT_TOPIC |
| + "=org/osgi/service/remoteserviceadmin/*))" //$NON-NLS-1$ |
| + ")"; //$NON-NLS-1$ |
| |
| private ArrayList workQueue = new ArrayList(); |
| |
| /** |
| * used by the multiplexer and serves as a marker whether or not the channel |
| * may dispose itself when the connection went down. |
| */ |
| boolean hasRedundantLinks = false; |
| |
| boolean traceChannelEndpoint = new Boolean(System.getProperty("ch.ethz.iks.r_osgi.impl.traceChannelEndpoint","false")).booleanValue(); |
| |
| void trace(String message) { |
| trace(message, null); |
| } |
| |
| void trace(String message, Throwable t) { |
| if (!traceChannelEndpoint) return; |
| if (message != null) |
| System.out.println("ChannelEndpoint;"+message); |
| if (t != null) |
| t.printStackTrace(); |
| } |
| |
| /** |
| * create a new channel endpoint. |
| * |
| * @param factory |
| * the transport channel factory. |
| * @param endpointAddress |
| * the address of the remote endpoint. |
| * @throws RemoteOSGiException |
| * if something goes wrong in R-OSGi. |
| * @throws IOException |
| * if something goes wrong on the network layer. |
| */ |
| ChannelEndpointImpl(final NetworkChannelFactory factory, |
| final URI endpointAddress) throws RemoteOSGiException, IOException { |
| trace("<init>(factory="+factory+",endpointAddress="+endpointAddress+")"); |
| networkChannel = factory.getConnection(this, endpointAddress); |
| if (RemoteOSGiServiceImpl.DEBUG) { |
| RemoteOSGiServiceImpl.log.log(LogService.LOG_DEBUG, |
| "opening new channel " + getRemoteAddress()); //$NON-NLS-1$ |
| } |
| initThreadPool(); |
| RemoteOSGiServiceImpl.registerChannelEndpoint(this); |
| } |
| |
| /** |
| * create a new channel endpoint from an incoming connection. |
| * |
| * @param channel |
| * the network channel of the incoming connection. |
| */ |
| ChannelEndpointImpl(final NetworkChannel channel) { |
| trace("<init>(channel="+channel+";remoteAddress="+channel.getRemoteAddress()+";localAddress="+channel.getLocalAddress()+")"); |
| networkChannel = channel; |
| channel.bind(this); |
| initThreadPool(); |
| RemoteOSGiServiceImpl.registerChannelEndpoint(this); |
| } |
| |
| /** |
| * initialize the thread pool |
| */ |
| private void initThreadPool() { |
| // TODO: tradeoff, could as well be central for all endpoints... |
| final ThreadGroup threadPool = new ThreadGroup("WorkerThreads" |
| + toString()); |
| // Set this thread pool to be daemon threads |
| threadPool.setDaemon(true); |
| for (int i = 0; i < RemoteOSGiServiceImpl.MAX_THREADS_PER_ENDPOINT; i++) { |
| final Thread t = new Thread(threadPool, "r-OSGi ChannelWorkerThread" + i) { |
| public void run() { |
| try { |
| while (!isInterrupted()) { |
| final Runnable r; |
| synchronized (workQueue) { |
| while (workQueue.isEmpty()) { |
| workQueue.wait(); |
| } |
| r = (Runnable) workQueue.remove(0); |
| } |
| r.run(); |
| } |
| } catch (InterruptedException ie) { |
| ie.printStackTrace(); |
| } |
| } |
| }; |
| t.start(); |
| } |
| } |
| |
| /** |
| * process a recieved message. Called by the channel. |
| * |
| * @param msg |
| * the received message. |
| * @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#receivedMessage(ch.ethz.iks.r_osgi.messages.RemoteOSGiMessage) |
| * @category ChannelEndpoint |
| */ |
| public void receivedMessage(final RemoteOSGiMessage msg) { |
| if (msg == null) { |
| dispose(); |
| return; |
| } |
| final Integer xid = new Integer(msg.getXID()); |
| final WaitingCallback callback; |
| synchronized (callbacks) { |
| callback = (WaitingCallback) callbacks.remove(xid); |
| } |
| if (callback != null) { |
| callback.result(msg); |
| return; |
| } else { |
| final Runnable r = new Runnable() { |
| public void run() { |
| final RemoteOSGiMessage reply = handleMessage(msg); |
| if (reply != null) { |
| |
| try { |
| trace("reply(msg="+reply+";remoteAddress="+networkChannel.getRemoteAddress()+")"); |
| networkChannel.sendMessage(reply); |
| } catch (final NotSerializableException nse) { |
| throw new RemoteOSGiException("Error sending " //$NON-NLS-1$ |
| + reply, nse); |
| } catch (NullPointerException npe) { |
| // channel got closed |
| } catch (final IOException e) { |
| dispose(); |
| } |
| } |
| } |
| }; |
| synchronized (workQueue) { |
| workQueue.add(r); |
| workQueue.notify(); |
| } |
| } |
| } |
| |
| /** |
| * invoke a method on the remote host. This function is used by all proxy |
| * bundles. |
| * |
| * @param service |
| * the service uri. |
| * @param methodSignature |
| * the method signature. |
| * @param args |
| * the method parameter. |
| * @throws Throwable |
| * can throw any exception that the original method can throw, |
| * plus RemoteOSGiException. |
| * @return the result of the remote method invocation. |
| * @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#invokeMethod(java.lang.String, |
| * java.lang.String, java.lang.Object[]) |
| * @category ChannelEndpoint |
| */ |
| public Object invokeMethod(final String service, |
| final String methodSignature, final Object[] args) throws Throwable { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| // check arguments for streams and replace with placeholder |
| for (int i = 0; i < args.length; i++) { |
| if (args[i] instanceof InputStream) { |
| args[i] = getInputStreamPlaceholder((InputStream) args[i]); |
| } else if (args[i] instanceof OutputStream) { |
| args[i] = getOutputStreamPlaceholder((OutputStream) args[i]); |
| } |
| } |
| |
| final RemoteCallMessage invokeMsg = new RemoteCallMessage(); |
| invokeMsg.setServiceID(URI.create(service).getFragment()); |
| invokeMsg.setMethodSignature(methodSignature); |
| invokeMsg.setArgs(args); |
| |
| try { |
| // send the message and get a MethodResultMessage in return |
| final RemoteCallResultMessage resultMsg = (RemoteCallResultMessage) sendAndWait(invokeMsg); |
| if (resultMsg.causedException()) { |
| throw resultMsg.getException(); |
| } |
| final Object result = resultMsg.getResult(); |
| if (result instanceof InputStreamHandle) { |
| return getInputStreamProxy((InputStreamHandle) result); |
| } else if (result instanceof OutputStreamHandle) { |
| return getOutputStreamProxy((OutputStreamHandle) result); |
| } else { |
| return result; |
| } |
| } catch (final RemoteOSGiException e) { |
| throw new RemoteOSGiException("Method invocation of " //$NON-NLS-1$ |
| + service + " " + methodSignature + " failed.", e); //$NON-NLS-1$ //$NON-NLS-2$ |
| } |
| } |
| |
| void asyncRemoteCall(final String fragment, final String methodSignature, |
| final Object[] args, final AsyncRemoteCallCallback callback) { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| // check arguments for streams and replace with placeholder |
| for (int i = 0; i < args.length; i++) { |
| if (args[i] instanceof InputStream) { |
| args[i] = getInputStreamPlaceholder((InputStream) args[i]); |
| } else if (args[i] instanceof OutputStream) { |
| args[i] = getOutputStreamPlaceholder((OutputStream) args[i]); |
| } |
| } |
| |
| final Integer xid = new Integer(RemoteOSGiServiceImpl.nextXid()); |
| |
| synchronized (callbacks) { |
| callbacks.put(xid, new AsyncCallback() { |
| public void result(final RemoteOSGiMessage msg) { |
| final RemoteCallResultMessage resultMsg = (RemoteCallResultMessage) msg; |
| if (resultMsg.causedException()) { |
| callback.remoteCallResult(false, resultMsg |
| .getException()); |
| } |
| final Object result = resultMsg.getResult(); |
| final Object res; |
| if (result instanceof InputStreamHandle) { |
| res = getInputStreamProxy((InputStreamHandle) result); |
| } else if (result instanceof OutputStreamHandle) { |
| res = getOutputStreamProxy((OutputStreamHandle) result); |
| } else { |
| res = result; |
| } |
| callback.remoteCallResult(true, res); |
| } |
| }); |
| } |
| |
| final RemoteCallMessage invokeMsg = new RemoteCallMessage(); |
| invokeMsg.setServiceID(fragment); |
| invokeMsg.setMethodSignature(methodSignature); |
| invokeMsg.setArgs(args); |
| invokeMsg.setXID(xid.shortValue()); |
| |
| try { |
| send(invokeMsg); |
| } catch (final RemoteOSGiException e) { |
| callbacks.remove(xid); |
| callback |
| .remoteCallResult( |
| false, |
| new RemoteOSGiException( |
| "Method invocation of " //$NON-NLS-1$ |
| + getRemoteAddress() |
| + "#" + fragment + " " + methodSignature + " failed.", e)); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ |
| } |
| } |
| |
| /** |
| * get the attributes of a service. This function is used to simplify proxy |
| * bundle generation. |
| * |
| * @param serviceID |
| * the serviceID of the remote service. |
| * @return the service attributes. |
| * @category ChannelEndpoint |
| */ |
| public Dictionary getProperties(final String serviceID) { |
| return getRemoteReference(serviceID).getProperties(); |
| } |
| |
| /** |
| * get the attributes for the presentation of the service. This function is |
| * used by proxies that support ServiceUI presentations. |
| * |
| * @param serviceID |
| * the serviceID of the remote service. |
| * @return the presentation attributes. |
| * @category ChannelEndpoint |
| */ |
| public Dictionary getPresentationProperties(final String serviceID) { |
| final Dictionary attribs = new Hashtable(); |
| attribs.put(RemoteOSGiService.SERVICE_URI, serviceID); |
| attribs.put(RemoteOSGiService.PRESENTATION, getRemoteReference( |
| serviceID).getProperty(RemoteOSGiService.PRESENTATION)); |
| return attribs; |
| } |
| |
| /** |
| * track the registration of a proxy service. |
| * |
| * @param serviceID |
| * the service ID. |
| * @param reg |
| * the service registration. |
| * @category ChannelEndpoint |
| */ |
| public void trackRegistration(final String serviceID, |
| final ServiceRegistration reg) { |
| proxiedServices.put(serviceID, reg); |
| } |
| |
| /** |
| * untrack the registration of a proxy service. |
| * |
| * @param serviceID |
| * the service ID. |
| * @category ChannelEndpoint |
| */ |
| public void untrackRegistration(final String serviceID) { |
| proxiedServices.remove(serviceID); |
| } |
| |
| /** |
| * get the temporal offset of a remote peer. |
| * |
| * @return the TimeOffset. |
| * @throws RemoteOSGiException |
| * in case of network errors. |
| */ |
| public TimeOffset getOffset() throws RemoteOSGiException { |
| if (timeOffset == null) { |
| // if unknown, perform a initial offset measurement round of 4 |
| // messages |
| TimeOffsetMessage timeMsg = new TimeOffsetMessage(); |
| for (int i = 0; i < 4; i++) { |
| timeMsg.timestamp(); |
| timeMsg = (TimeOffsetMessage) sendAndWait(timeMsg); |
| } |
| timeOffset = new TimeOffset(timeMsg.getTimeSeries()); |
| } else if (timeOffset.isExpired()) { |
| // if offset has expired, start a new measurement round |
| TimeOffsetMessage timeMsg = new TimeOffsetMessage(); |
| for (int i = 0; i < timeOffset.seriesLength(); i += 2) { |
| timeMsg.timestamp(); |
| timeMsg = (TimeOffsetMessage) sendAndWait(timeMsg); |
| } |
| timeOffset.update(timeMsg.getTimeSeries()); |
| } |
| return timeOffset; |
| } |
| |
| /** |
| * dispose the channel. |
| * |
| * @category ChannelEndpoint |
| */ |
| public void dispose() { |
| if (networkChannel == null) { |
| return; |
| } |
| |
| if (RemoteOSGiServiceImpl.DEBUG) { |
| RemoteOSGiServiceImpl.log.log(LogService.LOG_DEBUG, |
| "DISPOSING ENDPOINT " + getRemoteAddress()); //$NON-NLS-1$ |
| } |
| |
| RemoteOSGiServiceImpl.unregisterChannelEndpoint(getRemoteAddress() |
| .toString()); |
| if (handlerReg != null) { |
| handlerReg.unregister(); |
| } |
| |
| final NetworkChannel oldchannel = networkChannel; |
| networkChannel = null; |
| |
| try { |
| oldchannel.close(); |
| } catch (final IOException ioe) { |
| ioe.printStackTrace(); |
| } |
| |
| if (!hasRedundantLinks) { |
| // inform all listeners about all services |
| final RemoteServiceReference[] refs = (RemoteServiceReference[]) remoteServices |
| .values().toArray( |
| new RemoteServiceReference[remoteServices.size()]); |
| for (int i = 0; i < refs.length; i++) { |
| RemoteOSGiServiceImpl |
| .notifyRemoteServiceListeners(new RemoteServiceEvent( |
| RemoteServiceEvent.UNREGISTERING, refs[i])); |
| } |
| |
| // uninstall the proxy bundle |
| final Bundle[] proxies = (Bundle[]) proxyBundles.values().toArray( |
| new Bundle[proxyBundles.size()]); |
| |
| for (int i = 0; i < proxies.length; i++) { |
| try { |
| if (proxies[i].getState() != Bundle.UNINSTALLED) { |
| proxies[i].uninstall(); |
| } |
| } catch (final Throwable t) { |
| |
| } |
| } |
| } |
| |
| remoteServices = null; |
| remoteTopics = null; |
| timeOffset = null; |
| callbacks.clear(); |
| localServices.clear(); |
| proxiedServices.clear(); |
| closeStreams(); |
| streams.clear(); |
| handlerReg = null; |
| synchronized (callbacks) { |
| callbacks.notifyAll(); |
| } |
| } |
| |
| public boolean isConnected() { |
| return networkChannel != null; |
| } |
| |
| public String toString() { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| return "ChannelEndpoint(" + networkChannel.toString() + ")"; //$NON-NLS-1$ //$NON-NLS-2$ |
| } |
| |
| /** |
| * read a byte from the input stream on the peer identified by id. |
| * |
| * @param streamID |
| * the ID of the stream. |
| * @return result of the read operation. |
| * @throws IOException |
| * when an IOException occurs. |
| */ |
| public int readStream(final short streamID) throws IOException { |
| final StreamRequestMessage requestMsg = new StreamRequestMessage(); |
| requestMsg.setOp(StreamRequestMessage.READ); |
| requestMsg.setStreamID(streamID); |
| final StreamResultMessage resultMsg = doStreamOp(requestMsg); |
| return resultMsg.getResult(); |
| } |
| |
| /** |
| * read to an array from the input stream on the peer identified by id. |
| * |
| * @param streamID |
| * the ID of the stream. |
| * @param b |
| * the array to write the result to. |
| * @param off |
| * the offset for the destination array. |
| * @param len |
| * the number of bytes to read. |
| * @return number of bytes actually read. |
| * @throws IOException |
| * when an IOException occurs. |
| */ |
| public int readStream(final short streamID, final byte[] b, final int off, |
| final int len) throws IOException { |
| // handle special cases as defined in InputStream |
| if (b == null) { |
| throw new NullPointerException(); |
| } |
| if ((off < 0) || (len < 0) || (len + off > b.length)) { |
| throw new IndexOutOfBoundsException(); |
| } |
| if (len == 0) { |
| return 0; |
| } |
| final StreamRequestMessage requestMsg = new StreamRequestMessage(); |
| requestMsg.setOp(StreamRequestMessage.READ_ARRAY); |
| requestMsg.setStreamID(streamID); |
| requestMsg.setLenOrVal(len); |
| final StreamResultMessage resultMsg = doStreamOp(requestMsg); |
| final int length = resultMsg.getLen(); |
| // check the length first, could be -1 indicating EOF |
| if (length > 0) { |
| final byte[] readdata = resultMsg.getData(); |
| // copy result to byte array at correct offset |
| System.arraycopy(readdata, 0, b, off, length); |
| } |
| return length; |
| } |
| |
| /** |
| * write a byte to the output stream on the peer identified by id. |
| * |
| * @param streamID |
| * the ID of the stream. |
| * @param b |
| * the value. |
| * @throws IOException |
| * when an IOException occurs. |
| */ |
| public void writeStream(final short streamID, final int b) |
| throws IOException { |
| final StreamRequestMessage requestMsg = new StreamRequestMessage(); |
| requestMsg.setOp(StreamRequestMessage.WRITE); |
| requestMsg.setStreamID(streamID); |
| requestMsg.setLenOrVal(b); |
| // wait for the stream operation to finish |
| doStreamOp(requestMsg); |
| } |
| |
| /** |
| * write bytes from array to output stream on the peer identified by id. |
| * |
| * @param streamID |
| * the ID of the stream. |
| * @param b |
| * the source array. |
| * @param off |
| * offset into the source array. |
| * @param len |
| * number of bytes to copy. |
| * @throws IOException |
| * when an IOException occurs. |
| */ |
| public void writeStream(final short streamID, final byte[] b, |
| final int off, final int len) throws IOException { |
| // handle special cases as defined in OutputStream |
| if (b == null) { |
| throw new NullPointerException(); |
| } |
| if ((off < 0) || (len < 0) || (len + off > b.length)) { |
| throw new IndexOutOfBoundsException(); |
| } |
| final byte[] data = new byte[len]; |
| System.arraycopy(b, off, data, 0, len); |
| |
| final StreamRequestMessage requestMsg = new StreamRequestMessage(); |
| requestMsg.setOp(StreamRequestMessage.WRITE_ARRAY); |
| requestMsg.setStreamID(streamID); |
| requestMsg.setData(data); |
| requestMsg.setLenOrVal(len); |
| // wait for the stream operation to finish |
| doStreamOp(requestMsg); |
| } |
| |
| /** |
| * get the channel URI. |
| * |
| * @return the channel ID. |
| * @category ChannelEndpoint |
| */ |
| public URI getRemoteAddress() { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| return networkChannel.getRemoteAddress(); |
| } |
| |
| /** |
| * get hte local address. |
| * |
| * @return |
| */ |
| URI getLocalAddress() { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| return networkChannel.getLocalAddress(); |
| } |
| |
| /** |
| * send a lease. |
| * |
| * @param myServices |
| * the services of this peer. |
| * @param myTopics |
| * the topics of this peer. |
| * @return the remote service references of the other peer. |
| */ |
| RemoteServiceReference[] sendLease( |
| final RemoteServiceRegistration[] myServices, |
| final String[] myTopics) { |
| final LeaseMessage l = new LeaseMessage(); |
| populateLease(l, myServices, myTopics); |
| final LeaseMessage lease = (LeaseMessage) sendAndWait(l); |
| return processLease(lease); |
| } |
| |
| /** |
| * send a lease update. |
| * |
| * @param msg |
| * a lease update message. |
| */ |
| void sendLeaseUpdate(final LeaseUpdateMessage msg) { |
| send(msg); |
| } |
| |
| /** |
| * is the other side still reachable? |
| * |
| * @param uri |
| * the remote endpoint address. |
| * @return true if the other side is reachable. |
| */ |
| boolean isActive(final String uri) { |
| return remoteServices.get(uri) != null; |
| } |
| |
| /** |
| * fetch the service from the remote peer. |
| * |
| * @param ref |
| * the remote service reference. |
| * @throws IOException |
| * in case of network errors. |
| */ |
| void getProxyBundle(final RemoteServiceReference ref) throws IOException, |
| RemoteOSGiException { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed."); //$NON-NLS-1$ |
| } |
| |
| // build the RequestServiceMessage |
| final RequestServiceMessage req = new RequestServiceMessage(); |
| req.setServiceID(ref.getURI().getFragment()); |
| |
| // send the RequestServiceMessage and get a DeliverServiceMessage in |
| // return. The DeliverServiceMessage contains a minimal description of |
| // the resources |
| // of a proxy bundle. This is the service interface plus type injections |
| // plus import/export |
| // declarations for the bundle. |
| final DeliverServiceMessage deliv = (DeliverServiceMessage) sendAndWait(req); |
| |
| // generate a proxy bundle for the service |
| final InputStream in = new ProxyGenerator().generateProxyBundle(ref |
| .getURI(), deliv); |
| |
| installResolveAndStartBundle(ref, in, true); |
| } |
| |
| private void installResolveAndStartBundle(final RemoteServiceReference ref, |
| final InputStream in, final boolean isProxy) { |
| try { |
| |
| final Bundle bundle = RemoteOSGiActivator.getActivator() |
| .getContext().installBundle(ref.getURI().toString(), in); |
| |
| retrieveDependencies((String) bundle.getHeaders().get( |
| Constants.IMPORT_PACKAGE), (String) bundle.getHeaders() |
| .get(Constants.EXPORT_PACKAGE)); |
| |
| if (isProxy) { |
| // store the bundle for state updates and cleanup |
| proxyBundles.put(ref.getURI().getFragment(), bundle); |
| } |
| |
| // start the bundle |
| bundle.start(); |
| |
| } catch (final BundleException e) { |
| final Throwable nested = e.getNestedException() == null ? e : e |
| .getNestedException(); |
| //nested.printStackTrace(); |
| throw new RemoteOSGiException( |
| "Could not install the generated bundle " + ref.toString(), //$NON-NLS-1$ |
| nested); |
| } |
| } |
| |
| /** |
| * get a clone of a remote bundle |
| * |
| * @param ref |
| * the remote service reference to the service for which the |
| * bundle clone is requested. |
| */ |
| void getCloneBundle(final RemoteServiceReference ref) { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed."); //$NON-NLS-1$ |
| } |
| |
| // build the RequestBundleMessage |
| final RequestBundleMessage req = new RequestBundleMessage(); |
| req.setServiceID(ref.getURI().getFragment()); |
| |
| final DeliverBundlesMessage deliv = (DeliverBundlesMessage) sendAndWait(req); |
| |
| final byte[] bundleBytes = deliv.getDependencies()[0]; |
| |
| installResolveAndStartBundle(ref, |
| new ByteArrayInputStream(bundleBytes), false); |
| } |
| |
| /** |
| * tokenize a package import/export string |
| * |
| * @param str |
| * the string |
| * @return the tokens |
| */ |
| private String[] getTokens(final String str) { |
| final ArrayList result = new ArrayList(); |
| //final StringTokenizer tokenizer = new StringTokenizer(str, ","); |
| final String[] tokens = StringUtils.stringToArray(str, ","); |
| |
| for (int i=0; i<tokens.length; i++) { |
| final int pos; |
| // TODO: handle versions for R4! |
| final String pkg = (pos = tokens[i].indexOf(";")) > -1 ? tokens[i] |
| .substring(0, pos).trim() : tokens[i].trim(); |
| if (!RemoteOSGiServiceImpl.checkPackageImport(pkg)) { |
| result.add(pkg); |
| } |
| } |
| |
| return (String[]) result.toArray(new String[result.size()]); |
| } |
| |
| /** |
| * get the missing dependencies from remote for a given bundle defined by |
| * its declared package import and exports. |
| * |
| * @param importString |
| * the declared package imports |
| * @param exportString |
| * the declared package exports |
| */ |
| private void retrieveDependencies(final String importString, |
| final String exportString) { |
| |
| final Set exports = new HashSet(Arrays.asList(getTokens(exportString))); |
| final Set imports = new HashSet(Arrays.asList(getTokens(importString))); |
| |
| final String[] missing = (String[]) CollectionUtils.rightDifference( |
| imports, exports).toArray(new String[0]); |
| |
| if (missing.length > 0) { |
| final RequestDependenciesMessage req = new RequestDependenciesMessage(); |
| req.setPackages(missing); |
| final DeliverBundlesMessage deps = (DeliverBundlesMessage) sendAndWait(req); |
| final byte[][] depBytes = deps.getDependencies(); |
| for (int i = 0; i < depBytes.length; i++) { |
| try { |
| RemoteOSGiActivator.getActivator().getContext() |
| .installBundle("r-osgi://dep/" + missing[i], |
| new ByteArrayInputStream(depBytes[i])); |
| } catch (BundleException be) { |
| be.printStackTrace(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * get the remote reference for a given serviceID. |
| * |
| * @param serviceID |
| * the uri. |
| * @return the remote service reference, or <code>null</code>. |
| */ |
| RemoteServiceReferenceImpl getRemoteReference(final String uri) { |
| return (RemoteServiceReferenceImpl) remoteServices.get(uri); |
| } |
| |
| /** |
| * Get the remote references. |
| * |
| * @param filter |
| * a filter, or <code>null</code>. |
| * @return all remote service references which match the filter. |
| */ |
| RemoteServiceReference[] getAllRemoteReferences(final Filter filter) { |
| final List result = new ArrayList(); |
| final RemoteServiceReferenceImpl[] refs = (RemoteServiceReferenceImpl[]) remoteServices |
| .values().toArray( |
| new RemoteServiceReferenceImpl[remoteServices.size()]); |
| if (filter == null) { |
| return refs.length > 0 ? refs : null; |
| } else { |
| for (int i = 0; i < refs.length; i++) { |
| if (filter.match(refs[i].getProperties())) { |
| result.add(refs[i]); |
| } |
| } |
| final RemoteServiceReference[] refs2 = (RemoteServiceReference[]) result |
| .toArray(new RemoteServiceReferenceImpl[result.size()]); |
| return refs2.length > 0 ? refs2 : null; |
| } |
| } |
| |
| /** |
| * release the remote service. This leads to an uninstallation of the proxy |
| * bundle. |
| * |
| * @param uri |
| * the uri of the service. |
| */ |
| void ungetRemoteService(final URI uri) { |
| try { |
| Bundle bundle = (Bundle) proxyBundles.remove(uri.getFragment()); |
| // see https://bugs.eclipse.org/420897 |
| if (bundle != null) { |
| bundle.uninstall(); |
| } else { |
| RemoteOSGiServiceImpl.log |
| .log(LogService.LOG_WARNING, |
| "failed to uninstall non-existant bundle " + uri.getFragment()); //$NON-NLS-1$ |
| } |
| } catch (final BundleException be) { |
| // TODO Could a BE indicate that the old proxy bundle (which failed |
| // to uninstall) will prevent future proxy bundles from |
| // installing/resolving? |
| } |
| } |
| |
| private long startTime; |
| |
| public static final String TRACE_TIME_PROP = System.getProperty("ch.ethz.iks.r_osgi.traceSendMessageTime"); |
| |
| private static boolean TRACE_TIME = false; |
| private static boolean USE_LOG_SERVICE = true; |
| |
| static { |
| if (TRACE_TIME_PROP != null) { |
| if (TRACE_TIME_PROP.equalsIgnoreCase("logservice") || TRACE_TIME_PROP.equalsIgnoreCase("true")) { |
| TRACE_TIME = true; |
| } else if (TRACE_TIME_PROP.equalsIgnoreCase("systemout")) { |
| TRACE_TIME = true; |
| USE_LOG_SERVICE = false; |
| } |
| } |
| } |
| |
| private static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); |
| |
| void startTiming(String message) { |
| if (TRACE_TIME) { |
| startTime = System.currentTimeMillis(); |
| StringBuffer buf = new StringBuffer("TIMING.START;"); |
| buf.append(sdf.format(new Date(startTime))).append(";"); |
| buf.append((message==null?"":message)); |
| LogService logService = RemoteOSGiServiceImpl.log; |
| if (logService != null && USE_LOG_SERVICE) |
| logService.log(LogService.LOG_INFO, buf.toString()); |
| else |
| System.out.println(buf.toString()); |
| } |
| } |
| |
| void stopTiming(String message, Throwable exception) { |
| if (TRACE_TIME) { |
| StringBuffer buf = new StringBuffer("TIMING.END;"); |
| buf.append(sdf.format(new Date(startTime))).append(";"); |
| buf.append((message==null?"":message)); |
| buf.append(";duration(ms)=").append((System.currentTimeMillis()-startTime)); |
| LogService logService = RemoteOSGiServiceImpl.log; |
| if (logService != null && USE_LOG_SERVICE) { |
| if (exception != null) |
| logService.log(LogService.LOG_ERROR, buf.toString(), exception); |
| else |
| logService.log(LogService.LOG_INFO, buf.toString()); |
| } else { |
| System.out.println(buf.toString()); |
| if (exception != null) |
| exception.printStackTrace(); |
| } |
| startTime = 0; |
| } |
| } |
| |
| /** |
| * send a message. |
| * |
| * @param msg |
| * a message. |
| */ |
| void send(final RemoteOSGiMessage msg) { |
| if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } |
| |
| if (msg.getXID() == 0) { |
| msg.setXID(RemoteOSGiServiceImpl.nextXid()); |
| } |
| |
| Throwable t = null; |
| String timingMsg = "sendMessage;funcId="+msg.getFuncID()+";xid="+msg.getXID(); |
| startTiming(timingMsg); |
| |
| try { |
| try { |
| trace("send(msg="+msg+";remoteAddress="+networkChannel.getRemoteAddress()+")"); |
| networkChannel.sendMessage(msg); |
| return; |
| } catch (final IOException ioe) { |
| // TimeOffsetMessages have to be handled differently |
| // must send a new message with a new timestamp and XID |
| // instead |
| // of sending the same message again |
| if (msg instanceof TimeOffsetMessage) { |
| ((TimeOffsetMessage) msg).restamp(RemoteOSGiServiceImpl |
| .nextXid()); |
| networkChannel.sendMessage(msg); |
| } else { |
| networkChannel.sendMessage(msg); |
| } |
| } |
| } catch (final NotSerializableException nse) { |
| t = new RemoteOSGiException("Error sending " + msg, nse); //$NON-NLS-1$ |
| throw ((RemoteOSGiException) t); |
| } catch (final IOException ioe) { |
| // failed to reconnect... |
| dispose(); |
| t = new RemoteOSGiException("Network error", ioe); //$NON-NLS-1$ |
| throw ((RemoteOSGiException) t); |
| } finally { |
| stopTiming(timingMsg,t); |
| } |
| } |
| |
| /** |
| * message handler method. |
| * |
| * @param msg |
| * the incoming message. |
| * @return if reply is created, null otherwise. |
| * @throws RemoteOSGiException |
| * if something goes wrong. |
| */ |
| RemoteOSGiMessage handleMessage(final RemoteOSGiMessage msg) |
| throws RemoteOSGiException { |
| |
| trace("handleMessage(msg="+msg+";remoteAddress="+networkChannel.getRemoteAddress()+")"); |
| switch (msg.getFuncID()) { |
| // requests |
| case RemoteOSGiMessage.LEASE: { |
| final LeaseMessage lease = (LeaseMessage) msg; |
| processLease(lease); |
| |
| populateLease(lease, RemoteOSGiServiceImpl.getServices(), |
| RemoteOSGiServiceImpl.getTopics()); |
| return lease; |
| } |
| case RemoteOSGiMessage.REQUEST_SERVICE: { |
| final RequestServiceMessage reqSrv = (RequestServiceMessage) msg; |
| final String serviceID = reqSrv.getServiceID(); |
| |
| final RemoteServiceRegistration reg = getServiceRegistration(serviceID); |
| |
| final DeliverServiceMessage m = reg.getDeliverServiceMessage(); |
| m.setXID(reqSrv.getXID()); |
| m.setServiceID(reqSrv.getServiceID()); |
| return m; |
| } |
| case RemoteOSGiMessage.LEASE_UPDATE: { |
| final LeaseUpdateMessage suMsg = (LeaseUpdateMessage) msg; |
| |
| final String serviceID = suMsg.getServiceID(); |
| final short stateUpdate = suMsg.getType(); |
| |
| final String serviceURI = getRemoteAddress() |
| .resolve("#" + serviceID).toString(); |
| |
| switch (stateUpdate) { |
| case LeaseUpdateMessage.TOPIC_UPDATE: { |
| // There is an older r-OSGi version that incorrectly sends an ArrayList |
| // (1.0.0.RC4_v20131016-1848) |
| Object topicsAdded = suMsg.getPayload()[0]; |
| if (topicsAdded instanceof List) { |
| topicsAdded = ((List) topicsAdded).toArray(new String[0]); |
| } |
| Object topicsRemoved = suMsg.getPayload()[1]; |
| if (topicsRemoved instanceof List) { |
| topicsRemoved = ((List) topicsRemoved).toArray(new String[0]); |
| } |
| updateTopics((String[]) topicsAdded, (String[]) topicsRemoved); |
| return null; |
| } |
| case LeaseUpdateMessage.SERVICE_ADDED: { |
| final Dictionary properties = (Dictionary) suMsg.getPayload()[1]; |
| sanitizeServiceProperties(properties, serviceURI); |
| final RemoteServiceReferenceImpl ref = new RemoteServiceReferenceImpl( |
| (String[]) suMsg.getPayload()[0], serviceID, |
| properties, this); |
| |
| remoteServices.put(serviceURI, ref); |
| |
| RemoteOSGiServiceImpl |
| .notifyRemoteServiceListeners(new RemoteServiceEvent( |
| RemoteServiceEvent.REGISTERED, ref)); |
| |
| return null; |
| } |
| case LeaseUpdateMessage.SERVICE_MODIFIED: { |
| final Dictionary properties = (Dictionary) suMsg.getPayload()[1]; |
| sanitizeServiceProperties(properties, serviceURI); |
| final ServiceRegistration reg = (ServiceRegistration) proxiedServices |
| .get(serviceID); |
| if (reg != null) { |
| reg.setProperties(properties); |
| } |
| |
| final RemoteServiceReferenceImpl ref = getRemoteReference(serviceURI); //$NON-NLS-1$ |
| // If r-OSGi receives a SERVICE_MODIFIED for service X before it |
| // knows about X (SERVICE_ADDED), there is no point in updating |
| // the local service instance. It will fail with an NPE anyway. |
| // (see https://bugs.eclipse.org/420433) |
| if (ref == null && reg == null) { |
| return null; |
| } |
| ref.setProperties(properties); |
| RemoteOSGiServiceImpl |
| .notifyRemoteServiceListeners(new RemoteServiceEvent( |
| RemoteServiceEvent.MODIFIED, ref)); |
| return null; |
| } |
| case LeaseUpdateMessage.SERVICE_REMOVED: { |
| if (networkChannel == null) { |
| return null; |
| } |
| final RemoteServiceReference ref = (RemoteServiceReference) remoteServices |
| .remove(serviceURI); |
| if (ref != null) { |
| RemoteOSGiServiceImpl |
| .notifyRemoteServiceListeners(new RemoteServiceEvent( |
| RemoteServiceEvent.UNREGISTERING, ref)); |
| } |
| |
| final Bundle bundle = (Bundle) proxyBundles.remove(serviceID); |
| if (bundle != null) { |
| try { |
| bundle.uninstall(); |
| } catch (final BundleException be) { |
| be.printStackTrace(); |
| } |
| proxiedServices.remove(serviceID); |
| remoteServices.remove(serviceURI); //$NON-NLS-1$ |
| } |
| return null; |
| } |
| } |
| return null; |
| } |
| case RemoteOSGiMessage.REMOTE_CALL: { |
| final RemoteCallMessage invMsg = (RemoteCallMessage) msg; |
| try { |
| RemoteServiceRegistration serv = (RemoteServiceRegistration) localServices |
| .get(invMsg.getServiceID()); |
| if (serv == null) { |
| final RemoteServiceRegistration reg = getServiceRegistration(invMsg |
| .getServiceID()); |
| if (reg == null) { |
| throw new IllegalStateException(toString() |
| + "Could not get " + invMsg.getServiceID() //$NON-NLS-1$ |
| + ", known services " + localServices); //$NON-NLS-1$ |
| } else { |
| serv = reg; |
| } |
| } |
| |
| // get the invocation arguments and the local method |
| final Object[] arguments = invMsg.getArgs(); |
| for (int i = 0; i < arguments.length; i++) { |
| if (arguments[i] instanceof InputStreamHandle) { |
| arguments[i] = getInputStreamProxy((InputStreamHandle) arguments[i]); |
| } else if (arguments[i] instanceof OutputStreamHandle) { |
| arguments[i] = getOutputStreamProxy((OutputStreamHandle) arguments[i]); |
| } |
| } |
| |
| final Method method = serv.getMethod(invMsg |
| .getMethodSignature()); |
| |
| // invoke method |
| try { |
| Object result = method.invoke(serv.getServiceObject(), |
| arguments); |
| final RemoteCallResultMessage m = new RemoteCallResultMessage(); |
| m.setXID(invMsg.getXID()); |
| if (result instanceof InputStream) { |
| m |
| .setResult(getInputStreamPlaceholder((InputStream) result)); |
| } else if (result instanceof OutputStream) { |
| m |
| .setResult(getOutputStreamPlaceholder((OutputStream) result)); |
| } else { |
| m.setResult(result); |
| } |
| return m; |
| } catch (final InvocationTargetException t) { |
| t.printStackTrace(); |
| throw t.getTargetException(); |
| } |
| } catch (final Throwable t) { |
| // TODO: send to log |
| t.printStackTrace(); |
| final RemoteCallResultMessage m = new RemoteCallResultMessage(); |
| m.setXID(invMsg.getXID()); |
| m.setException(t); |
| return m; |
| } |
| } |
| case RemoteOSGiMessage.REMOTE_EVENT: { |
| final RemoteEventMessage eventMsg = (RemoteEventMessage) msg; |
| final Dictionary properties = eventMsg.getProperties(); |
| |
| // transform the event timestamps |
| final Long remoteTs; |
| if ((remoteTs = (Long) properties.get(EventConstants.TIMESTAMP)) != null) { |
| properties.put(EventConstants.TIMESTAMP, getOffset().transform( |
| remoteTs)); |
| } |
| |
| final Event event = new Event(eventMsg.getTopic(), properties); |
| |
| // and deliver the event to the local framework |
| if (RemoteOSGiServiceImpl.eventAdminTracker.getTrackingCount() > 0) { |
| ((EventAdmin) RemoteOSGiServiceImpl.eventAdminTracker |
| .getService()).postEvent(event); |
| } else { |
| // TODO: to log |
| System.err.println("Could not deliver received event: " + event //$NON-NLS-1$ |
| + ". No EventAdmin available."); //$NON-NLS-1$ |
| } |
| return null; |
| } |
| case RemoteOSGiMessage.TIME_OFFSET: { |
| // add timestamp to the message and return the message to sender |
| ((TimeOffsetMessage) msg).timestamp(); |
| return msg; |
| } |
| case RemoteOSGiMessage.STREAM_REQUEST: { |
| final StreamRequestMessage reqMsg = (StreamRequestMessage) msg; |
| try { |
| // fetch stream object |
| final Object stream = streams.get(new Integer(reqMsg |
| .getStreamID())); |
| if (stream == null) { |
| throw new IllegalStateException( |
| "Could not get stream with ID " //$NON-NLS-1$ |
| + reqMsg.getStreamID()); |
| } |
| // invoke operation on stream |
| switch (reqMsg.getOp()) { |
| case StreamRequestMessage.READ: { |
| final int result = ((InputStream) stream).read(); |
| final StreamResultMessage m = new StreamResultMessage(); |
| m.setXID(reqMsg.getXID()); |
| m.setResult((short) result); |
| return m; |
| } |
| case StreamRequestMessage.READ_ARRAY: { |
| final byte[] b = new byte[reqMsg.getLenOrVal()]; |
| final int len = ((InputStream) stream).read(b, 0, reqMsg |
| .getLenOrVal()); |
| final StreamResultMessage m = new StreamResultMessage(); |
| m.setXID(reqMsg.getXID()); |
| m.setResult(StreamResultMessage.RESULT_ARRAY); |
| m.setLen(len); |
| if (len > 0) { |
| m.setData(b); |
| } |
| return m; |
| } |
| case StreamRequestMessage.WRITE: { |
| ((OutputStream) stream).write(reqMsg.getLenOrVal()); |
| final StreamResultMessage m = new StreamResultMessage(); |
| m.setXID(reqMsg.getXID()); |
| m.setResult(StreamResultMessage.RESULT_WRITE_OK); |
| return m; |
| } |
| case StreamRequestMessage.WRITE_ARRAY: { |
| ((OutputStream) stream).write(reqMsg.getData()); |
| final StreamResultMessage m = new StreamResultMessage(); |
| m.setXID(reqMsg.getXID()); |
| m.setResult(StreamResultMessage.RESULT_WRITE_OK); |
| return m; |
| } |
| default: |
| throw new RemoteOSGiException( |
| "Unimplemented op code for stream request " + msg); //$NON-NLS-1$ |
| } |
| } catch (final IOException e) { |
| final StreamResultMessage m = new StreamResultMessage(); |
| m.setXID(reqMsg.getXID()); |
| m.setResult(StreamResultMessage.RESULT_EXCEPTION); |
| m.setException(e); |
| return m; |
| } |
| } |
| case RemoteOSGiMessage.REQUEST_BUNDLE: |
| final RequestBundleMessage reqB = (RequestBundleMessage) msg; |
| |
| try { |
| final String serviceID = reqB.getServiceID(); |
| |
| final RemoteServiceRegistration reg = getServiceRegistration(serviceID); |
| |
| final byte[] bytes = RemoteOSGiServiceImpl.getBundle(reg |
| .getReference().getBundle()); |
| |
| final DeliverBundlesMessage delB = new DeliverBundlesMessage(); |
| delB.setXID(reqB.getXID()); |
| delB.setDependencies(new byte[][] { bytes }); |
| return delB; |
| } catch (IOException ioe) { |
| ioe.printStackTrace(); |
| return null; |
| } |
| case RemoteOSGiMessage.REQUEST_DEPENDENCIES: |
| final RequestDependenciesMessage reqDeps = (RequestDependenciesMessage) msg; |
| |
| try { |
| final byte[][] bundleBytes = RemoteOSGiServiceImpl |
| .getBundlesForPackages(reqDeps.getPackages()); |
| final DeliverBundlesMessage delDeps = new DeliverBundlesMessage(); |
| delDeps.setXID(reqDeps.getXID()); |
| delDeps.setDependencies(bundleBytes); |
| return delDeps; |
| } catch (IOException ioe) { |
| ioe.printStackTrace(); |
| return null; |
| } |
| default: |
| throw new RemoteOSGiException("Unimplemented message " + msg); //$NON-NLS-1$ |
| } |
| } |
| |
| /** |
| * send a message and wait for the result. |
| * |
| * @param msg |
| * the message. |
| * @return the result message. |
| */ |
| private RemoteOSGiMessage sendAndWait(final RemoteOSGiMessage msg) { |
| if (msg.getXID() == 0) { |
| msg.setXID(RemoteOSGiServiceImpl.nextXid()); |
| } |
| final Integer xid = new Integer(msg.getXID()); |
| final WaitingCallback blocking = new WaitingCallback(); |
| |
| synchronized (callbacks) { |
| callbacks.put(xid, blocking); |
| } |
| |
| send(msg); |
| |
| // wait for the reply |
| synchronized (blocking) { |
| final long timeout = System.currentTimeMillis() + TIMEOUT; |
| RemoteOSGiMessage result = blocking.getResult(); |
| try { |
| while (result == null && networkChannel != null |
| && System.currentTimeMillis() < timeout) { |
| blocking.wait(TIMEOUT); |
| result = blocking.getResult(); |
| } |
| } catch (InterruptedException ie) { |
| throw new RemoteOSGiException( |
| "Interrupted while waiting for callback", ie); //$NON-NLS-1$ |
| } |
| if (result != null) { |
| return result; |
| } else if (networkChannel == null) { |
| throw new RemoteOSGiException("Channel is closed"); //$NON-NLS-1$ |
| } else { |
| throw new RemoteOSGiException( |
| "Method Invocation failed, timeout exceeded."); //$NON-NLS-1$ |
| } |
| } |
| |
| } |
| |
| /** |
| * get the remote service registration for a given service ID. |
| * |
| * @param serviceID |
| * the serviceID. |
| * @return the remote service registration, or <code>null</code>. |
| */ |
| private RemoteServiceRegistration getServiceRegistration( |
| final String serviceID) { |
| final RemoteServiceRegistration reg = RemoteOSGiServiceImpl |
| .getServiceRegistration(serviceID); |
| |
| localServices.put(serviceID, reg); |
| return reg; |
| } |
| |
| /** |
| * populate a lease message with values. |
| * |
| * @param lease |
| * the lease message. |
| * @param regs |
| * the registrations. |
| * @param topics |
| * the topics. |
| */ |
| private void populateLease(final LeaseMessage lease, |
| final RemoteServiceRegistration[] regs, final String[] topics) { |
| final String[] serviceIDs = new String[regs.length]; |
| final String[][] serviceInterfaces = new String[regs.length][]; |
| final Dictionary[] serviceProperties = new Dictionary[regs.length]; |
| |
| for (short i = 0; i < regs.length; i++) { |
| serviceIDs[i] = String.valueOf(regs[i].getServiceID()); |
| serviceInterfaces[i] = regs[i].getInterfaceNames(); |
| serviceProperties[i] = regs[i].getProperties(); |
| } |
| lease.setServiceIDs(serviceIDs); |
| lease.setServiceInterfaces(serviceInterfaces); |
| lease.setServiceProperties(serviceProperties); |
| lease.setTopics(topics); |
| } |
| |
| /** |
| * process a lease message. |
| * |
| * @param lease |
| * the lease message. |
| * @return the remote references. |
| */ |
| private RemoteServiceReference[] processLease(final LeaseMessage lease) { |
| final String[] serviceIDs = lease.getServiceIDs(); |
| final String[][] serviceInterfaces = lease.getServiceInterfaces(); |
| final Dictionary[] serviceProperties = lease.getServiceProperties(); |
| |
| |
| final RemoteServiceReferenceImpl[] refs = new RemoteServiceReferenceImpl[serviceIDs.length]; |
| for (short i = 0; i < serviceIDs.length; i++) { |
| final String serviceID = serviceIDs[i]; |
| final String serviceURI = getRemoteAddress().resolve("#" + serviceID).toString(); |
| final Dictionary properties = serviceProperties[i]; |
| sanitizeServiceProperties(properties, serviceURI); |
| refs[i] = new RemoteServiceReferenceImpl(serviceInterfaces[i], |
| serviceID, properties, this); |
| |
| remoteServices.put(refs[i].getURI().toString(), refs[i]); |
| RemoteOSGiServiceImpl |
| .notifyRemoteServiceListeners(new RemoteServiceEvent( |
| RemoteServiceEvent.REGISTERED, refs[i])); |
| } |
| updateTopics(lease.getTopics(), new String[0]); |
| return refs; |
| } |
| |
| private void sanitizeServiceProperties( |
| final Dictionary properties, final String serviceURI) { |
| // adjust the properties |
| properties.put(RemoteOSGiService.SERVICE_URI, serviceURI); |
| // remove the service PID, if set |
| properties.remove(Constants.SERVICE_PID); |
| // remove the R-OSGi registration property |
| properties.remove(RemoteOSGiService.R_OSGi_REGISTRATION); |
| // also remote the ECF registration property |
| properties.remove("org.eclipse.ecf.serviceRegistrationRemote"); //$NON-NLS-1$ |
| } |
| |
| /** |
| * perform a stream operation. |
| * |
| * @param requestMsg |
| * the request message. |
| * @return the result message. |
| * @throws IOException |
| */ |
| private StreamResultMessage doStreamOp(final StreamRequestMessage requestMsg) |
| throws IOException { |
| try { |
| // send the message and get a StreamResultMessage in return |
| final StreamResultMessage result = (StreamResultMessage) sendAndWait(requestMsg); |
| if (result.causedException()) { |
| throw result.getException(); |
| } |
| return result; |
| } catch (final RemoteOSGiException e) { |
| throw new RemoteOSGiException("Invocation of operation " //$NON-NLS-1$ |
| + requestMsg.getOp() + " on stream " //$NON-NLS-1$ |
| + requestMsg.getStreamID() + " failed.", e); //$NON-NLS-1$ |
| } |
| } |
| |
| /** |
| * update the topics |
| * |
| * @param topicsAdded |
| * the topics added. |
| * @param topicsRemoved |
| * the topics removed. |
| */ |
| private void updateTopics(final String[] topicsAdded, |
| final String[] topicsRemoved) { |
| |
| if (handlerReg == null) { |
| // Remote might send a |
| // ch.ethz.iks.r_osgi.messages.LeaseUpdateMessage.TOPIC_UPDATE |
| // message |
| // (see |
| // ch.ethz.iks.r_osgi.impl.RemoteOSGiServiceImpl.setupTrackers(...).new |
| // ServiceTrackerCustomizer() {...}.removedService(ServiceReference, |
| // Object)) with null as the topicsAdded list. Thus, ignore null. |
| if (topicsAdded != null && topicsAdded.length > 0) { |
| // register handler |
| final Dictionary properties = new Hashtable(); |
| properties.put(EventConstants.EVENT_TOPIC, topicsAdded); |
| properties.put(EventConstants.EVENT_FILTER, NO_LOOPS); |
| properties.put(RemoteOSGiServiceImpl.R_OSGi_INTERNAL, |
| Boolean.TRUE); |
| handlerReg = RemoteOSGiActivator.getActivator().getContext() |
| .registerService(EventHandler.class.getName(), |
| new EventForwarder(), properties); |
| remoteTopics.addAll(Arrays.asList(topicsAdded)); |
| } |
| } else { |
| if (topicsRemoved != null) { |
| remoteTopics.removeAll(Arrays.asList(topicsRemoved)); |
| } |
| if (topicsAdded != null) { |
| remoteTopics.addAll(Arrays.asList(topicsAdded)); |
| } |
| |
| if (remoteTopics.size() == 0) { |
| // unregister handler |
| handlerReg.unregister(); |
| handlerReg = null; |
| } else { |
| // update topics |
| final Dictionary properties = new Hashtable(); |
| properties.put(EventConstants.EVENT_TOPIC, remoteTopics |
| .toArray(new String[remoteTopics.size()])); |
| properties.put(EventConstants.EVENT_FILTER, NO_LOOPS); |
| properties.put(RemoteOSGiServiceImpl.R_OSGi_INTERNAL, |
| Boolean.TRUE); |
| handlerReg.setProperties(properties); |
| } |
| } |
| |
| if (RemoteOSGiServiceImpl.MSG_DEBUG) { |
| RemoteOSGiServiceImpl.log.log(LogService.LOG_DEBUG, |
| "NEW REMOTE TOPIC SPACE for " + getRemoteAddress() + " is " //$NON-NLS-1$ //$NON-NLS-2$ |
| + remoteTopics); |
| } |
| } |
| |
| /** |
| * creates a placeholder for an InputStream that can be sent to the other |
| * party and will be converted to an InputStream proxy there. |
| * |
| * @param origIS |
| * the instance of InputStream that needs to be remoted |
| * @return the placeholder object that is sent to the actual client |
| */ |
| private InputStreamHandle getInputStreamPlaceholder(final InputStream origIS) { |
| final InputStreamHandle sp = new InputStreamHandle(nextStreamID()); |
| streams.put(new Integer(sp.getStreamID()), origIS); |
| return sp; |
| } |
| |
| /** |
| * creates a proxy for the input stream that corresponds to the placeholder. |
| * |
| * @param placeholder |
| * the placeholder for the remote input stream |
| * @return the proxy for the input stream |
| */ |
| private InputStream getInputStreamProxy(final InputStreamHandle placeholder) { |
| return new InputStreamProxy(placeholder.getStreamID(), this); |
| } |
| |
| /** |
| * creates a placeholder for an OutputStream that can be sent to the other |
| * party and will be converted to an OutputStream proxy there. |
| * |
| * @param origOS |
| * the instance of OutputStream that needs to be remoted |
| * @return the placeholder object that is sent to the actual client |
| */ |
| private OutputStreamHandle getOutputStreamPlaceholder( |
| final OutputStream origOS) { |
| final OutputStreamHandle sp = new OutputStreamHandle(nextStreamID()); |
| streams.put(new Integer(sp.getStreamID()), origOS); |
| return sp; |
| } |
| |
| /** |
| * creates a proxy for the output stream that corresponds to the |
| * placeholder. |
| * |
| * @param placeholder |
| * the placeholder for the remote output stream |
| * @return the proxy for the output stream |
| */ |
| private OutputStream getOutputStreamProxy( |
| final OutputStreamHandle placeholder) { |
| return new OutputStreamProxy(placeholder.getStreamID(), this); |
| } |
| |
| /** |
| * get the next stream wrapper id. |
| * |
| * @return the next stream wrapper id. |
| */ |
| private synchronized short nextStreamID() { |
| if (nextStreamID == -1) { |
| nextStreamID = 0; |
| } |
| return (++nextStreamID); |
| } |
| |
| /** |
| * closes all streams that are still open. |
| */ |
| private void closeStreams() { |
| final Object[] s = streams.values().toArray(); |
| try { |
| for (int i = 0; i < s.length; i++) { |
| if (s[i] instanceof InputStream) { |
| ((InputStream) s[i]).close(); |
| } else if (s[i] instanceof OutputStream) { |
| ((OutputStream) s[i]).close(); |
| } else { |
| RemoteOSGiServiceImpl.log |
| .log(LogService.LOG_WARNING, |
| "Object in input streams map was not an instance of a stream."); //$NON-NLS-1$ |
| } |
| } |
| } catch (final IOException e) { |
| } |
| } |
| |
| /** |
| * forwards events over the channel to the remote peer. |
| * |
| * @author Jan S. Rellermeyer, ETH Zurich |
| * @category EventHandler |
| */ |
| final class EventForwarder implements EventHandler { |
| |
| /** |
| * handle an event. |
| * |
| * @param event |
| * the event. |
| */ |
| public void handleEvent(final Event event) { |
| try { |
| final RemoteEventMessage msg = new RemoteEventMessage(); |
| msg.setTopic(event.getTopic()); |
| final String[] propertyNames = event.getPropertyNames(); |
| final Dictionary props = new Hashtable(); |
| for (int i = 0; i < propertyNames.length; i++) { |
| props.put(propertyNames[i], event |
| .getProperty(propertyNames[i])); |
| } |
| props.put(RemoteEventMessage.EVENT_SENDER_URI, networkChannel |
| .getLocalAddress()); |
| msg.setProperties(props); |
| send(msg); |
| |
| if (RemoteOSGiServiceImpl.MSG_DEBUG) { |
| RemoteOSGiServiceImpl.log.log(LogService.LOG_DEBUG, |
| "Forwarding Event " + event); //$NON-NLS-1$ |
| } |
| } catch (final Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| /** |
| * callback that signals when the result has become available. |
| * |
| * @author Jan S. Rellermeyer |
| * |
| */ |
| class WaitingCallback implements AsyncCallback { |
| |
| private RemoteOSGiMessage result; |
| |
| public synchronized void result(RemoteOSGiMessage msg) { |
| result = msg; |
| this.notifyAll(); |
| } |
| |
| synchronized RemoteOSGiMessage getResult() { |
| return result; |
| } |
| |
| } |
| |
| } |