/* Copyright (c) 2006-2008 Jan S. Rellermeyer
 * Systems Group,
 * Department of Computer Science, ETH Zurich.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *    - Redistributions of source code must retain the above copyright notice,
 *      this list of conditions and the following disclaimer.
 *    - Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *      documentation and/or other materials provided with the distribution.
 *    - Neither the name of ETH Zurich nor the names of its contributors may be
 *      used to endorse or promote products derived from this software without
 *      specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
package ch.ethz.iks.r_osgi.impl;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.CRC32;

import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.osgi.service.log.LogService;
import org.osgi.service.packageadmin.PackageAdmin;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;

import ch.ethz.iks.r_osgi.AsyncRemoteCallCallback;
import ch.ethz.iks.r_osgi.RemoteOSGiException;
import ch.ethz.iks.r_osgi.RemoteOSGiService;
import ch.ethz.iks.r_osgi.RemoteServiceEvent;
import ch.ethz.iks.r_osgi.RemoteServiceListener;
import ch.ethz.iks.r_osgi.RemoteServiceReference;
import ch.ethz.iks.r_osgi.Remoting;
import ch.ethz.iks.r_osgi.SurrogateRegistration;
import ch.ethz.iks.r_osgi.URI;
import ch.ethz.iks.r_osgi.channels.ChannelEndpoint;
import ch.ethz.iks.r_osgi.channels.ChannelEndpointManager;
import ch.ethz.iks.r_osgi.channels.NetworkChannel;
import ch.ethz.iks.r_osgi.channels.NetworkChannelFactory;
import ch.ethz.iks.r_osgi.messages.LeaseUpdateMessage;
import ch.ethz.iks.r_osgi.service_discovery.ServiceDiscoveryHandler;
import ch.ethz.iks.util.CollectionUtils;
import ch.ethz.iks.util.StringUtils;

/**
 * <p>
 * The R-OSGi core class. Handles remote channels and subscriptions from the
 * local framework. Local services can be released for remoting and then
 * discovered by remote peers.
 * </p>
 * 
 * @author Jan S. Rellermeyer, ETH Zurich
 * @since 0.1
 */
final class RemoteOSGiServiceImpl implements RemoteOSGiService, Remoting {

	static boolean IS_JAVA5 = false;

	static boolean IS_R4 = false;

	private final static Method getEntry;
	private final static Method getEntryPaths;
	private final static File base;

	static {
		final String verString = System.getProperty("java.class.version"); //$NON-NLS-1$
		if (verString != null && Float.parseFloat(verString) >= 49) {
			IS_JAVA5 = true;
		}
		final String osgiVerString = System
				.getProperty(Constants.FRAMEWORK_VERSION);
		if (osgiVerString != null && (!osgiVerString.trim().startsWith("1.3"))) { //$NON-NLS-1$
			IS_R4 = true;
		}
		Method m = null;
		Method n = null;
		try {
			m = Bundle.class
					.getMethod("getEntry", new Class[] { String.class });
			n = Bundle.class.getMethod("getEntryPaths",
					new Class[] { String.class });
		} catch (SecurityException e) {
			e.printStackTrace();
		} catch (NoSuchMethodException e) {
		}
		getEntry = m;
		getEntryPaths = n;

		base = getEntry == null ? RemoteOSGiActivator.getActivator()
				.getContext().getDataFile("../..") : null;
	}

	/**
	 * the R-OSGi standard port.
	 */
	static int R_OSGI_PORT = 9278;

	/**
	 * the R-OSGi port property.
	 */
	static final String R_OSGi_PORT_PROPERTY = "ch.ethz.iks.r_osgi.port"; //$NON-NLS-1$

	/**
	 * register the default tcp channel? If not set to "false", the channel gets
	 * registered.
	 */
	static final String REGISTER_DEFAULT_TCP_CHANNEL = "ch.ethz.iks.r_osgi.registerDefaultChannel"; //$NON-NLS-1$

	/**
	 * register the default tcp channel? If not set to "false", the channel gets
	 * registered.
	 */
	static final String THREADS_PER_ENDPOINT = "ch.ethz.iks.r_osgi.threadsPerEndpoint"; //$NON-NLS-1$

	/**
	 * constant that holds the property string for proxy debug option.
	 */
	static final String PROXY_DEBUG_PROPERTY = "ch.ethz.iks.r_osgi.debug.proxyGeneration"; //$NON-NLS-1$

	/**
	 * constant that holds the property string for message debug option.
	 */
	static final String MSG_DEBUG_PROPERTY = "ch.ethz.iks.r_osgi.debug.messages"; //$NON-NLS-1$

	/**
	 * constant that holds the property string for internal debug option.
	 */
	static final String DEBUG_PROPERTY = "ch.ethz.iks.r_osgi.debug.internal"; //$NON-NLS-1$

	/**
	 * marker for channel-registered event handlers so that they don't
	 * contribute to the peer's topic space.
	 */
	static final String R_OSGi_INTERNAL = "internal"; //$NON-NLS-1$

	/**
	 * file name of the manifest
	 */
	private static final String MANIFEST_FILE_NAME = "META-INF/MANIFEST.MF"; //$NON-NLS-1$

	/**
	 * We don't want it platform dependent because the Jars will always use the
	 * unix variant
	 */
	private static final String SEPARATOR_CHAR = "/"; //$NON-NLS-1$

	/**
	 * the buffer size
	 */
	private static final int BUFFER_SIZE = 2048;

	/**
	 * how many worker threads per endpoint?
	 */
	static final int MAX_THREADS_PER_ENDPOINT = Integer.getInteger(
			THREADS_PER_ENDPOINT, 2).intValue();

	/**
	 * log proxy generation debug output.
	 */
	static boolean PROXY_DEBUG;

	/**
	 * log message traffic.
	 */
	static boolean MSG_DEBUG;

	/**
	 * log method invocation debug output.
	 */
	static boolean DEBUG;

	/**
	 * the address of this peer.
	 */
	static String MY_ADDRESS;

	/**
	 * service reference -> remote service registration.
	 */
	static Map serviceRegistrations = new HashMap(1);

	/**
	 * next transaction id.
	 */
	private static int nextXid;

	/**
	 * OSGi log service instance.
	 */
	static LogService log;

	/**
	 * the event admin tracker
	 */
	static ServiceTracker eventAdminTracker;

	/**
	 * 
	 */
	private static ServiceTracker eventHandlerTracker;

	/**
	 * 
	 */
	private static ServiceTracker remoteServiceTracker;

	/**
	 * 
	 */
	private static ServiceTracker remoteServiceListenerTracker;

	/**
	 * 
	 */
	private static ServiceTracker networkChannelFactoryTracker;

	/**
	 * 
	 */
	private static ServiceTracker serviceDiscoveryHandlerTracker;

	/**
	 * Channel ID --> ChannelEndpoint.
	 */
	private static Map channels = new HashMap(0);

	/**
	 * Channel ID --> ChannelEndpointMultiplexer
	 */
	private static Map multiplexers = new HashMap(0);
	
	/**
	 * Event topics this instance is going to ignore and not remote
	 */
	private static final Set topicFilters = new HashSet(0);

	/**
	 * The package admin
	 */
	private static PackageAdmin pkgAdmin;

	/**
	 * creates a new RemoteOSGiServiceImpl instance.
	 * 
	 * @throws IOException
	 *             in case of IO problems.
	 */
	RemoteOSGiServiceImpl() throws IOException {
		// [R_OSGi] Event Admin problems
		// https://bugs.eclipse.org/419327
		final String topics = System.getProperty("ch.ethz.iks.r_osgi.topic.filter", "");
		final String[] strings = StringUtils.stringToArray(topics, ",");
		for (int i = 0; i < strings.length; i++) {
			topicFilters.add(strings[i]);
		}
		// [TCK][r-OSGi] NonSerializableException when running remoteserviceadmin ct
		// https://bugs.eclipse.org/418740
		topicFilters.add("org/osgi/service/remoteserviceadmin/*");
		
		// find out own IP address
		try {
			MY_ADDRESS = InetAddress.getAllByName(InetAddress.getLocalHost()
					.getHostName())[0].getHostAddress();
		} catch (final Throwable t) {
			MY_ADDRESS = System.getProperty("ch.ethz.iks.r_osgi.ip", //$NON-NLS-1$
					"127.0.0.1"); //$NON-NLS-1$
		}

		// set the debug switches
		final BundleContext context = RemoteOSGiActivator.getActivator()
				.getContext();
		String prop = context.getProperty(PROXY_DEBUG_PROPERTY);
		PROXY_DEBUG = prop != null ? Boolean.valueOf(prop).booleanValue()
				: false;
		prop = context.getProperty(MSG_DEBUG_PROPERTY);
		MSG_DEBUG = prop != null ? Boolean.valueOf(prop).booleanValue() : false;
		prop = context.getProperty(DEBUG_PROPERTY);
		DEBUG = prop != null ? Boolean.valueOf(prop).booleanValue() : false;

		if (log != null) {
			if (PROXY_DEBUG) {
				log.log(LogService.LOG_INFO, "PROXY DEBUG OUTPUTS ENABLED"); //$NON-NLS-1$
			}
			if (MSG_DEBUG) {
				log.log(LogService.LOG_INFO, "MESSAGE DEBUG OUTPUTS ENABLED"); //$NON-NLS-1$
			}
			if (DEBUG) {
				log.log(LogService.LOG_INFO, "INTERNAL DEBUG OUTPUTS ENABLED"); //$NON-NLS-1$
			}
		} else {
			if (PROXY_DEBUG || MSG_DEBUG || DEBUG) {
				System.err
						.println("WARNING: NO LOG SERVICE PRESENT, DEBUG PROPERTIES HAVE NO EFFECT ..."); //$NON-NLS-1$
				PROXY_DEBUG = false;
				MSG_DEBUG = false;
				DEBUG = false;
			}
		}

		// set port
		prop = context.getProperty(R_OSGi_PORT_PROPERTY);
		R_OSGI_PORT = prop != null ? Integer.parseInt(prop) : 9278;

		// initialize the transactionID with a random value
		nextXid = (short) Math.round(Math.random() * Short.MAX_VALUE);

		// get the package admin
		final ServiceReference ref = context
				.getServiceReference(PackageAdmin.class.getName());
		if (ref == null) {
			// TODO: handle this more gracefully
			throw new RuntimeException(
					"No package admin service available, R-OSGi terminates."); //$NON-NLS-1$
		}
		pkgAdmin = (PackageAdmin) context.getService(ref);

		setupTrackers(context);
	}

	private void setupTrackers(final BundleContext context) throws IOException {

		// initialize service trackers
		eventAdminTracker = new ServiceTracker(context, EventAdmin.class
				.getName(), null);
		eventAdminTracker.open();
		if (eventAdminTracker.getTrackingCount() == 0 && log != null) {
			log
					.log(LogService.LOG_WARNING,
							"NO EVENT ADMIN FOUND. REMOTE EVENT DELIVERY TEMPORARILY DISABLED."); //$NON-NLS-1$
		}

		try {
			eventHandlerTracker = new ServiceTracker(
					context,
					context.createFilter("(&(" + Constants.OBJECTCLASS + "=" //$NON-NLS-1$ //$NON-NLS-2$
							+ EventHandler.class.getName()
							+ ")(|(!(" //$NON-NLS-1$
							+ R_OSGi_INTERNAL
							+ "=*))" //$NON-NLS-1$
							// [TCK][r-OSGi] NonSerializableException when running remoteserviceadmin ct
							// https://bugs.eclipse.org/418740
							+ "(!(" //$NON-NLS-1$
							+ EventConstants.EVENT_TOPIC
							+ "=org/osgi/service/remoteserviceadmin/*))))"), //$NON-NLS-1$
					new ServiceTrackerCustomizer() {

						public Object addingService(
								final ServiceReference reference) {

							// https://bugs.eclipse.org/bugs/show_bug.cgi?id=326033
							Collection theTopics;
							Object topic = reference
									.getProperty(EventConstants.EVENT_TOPIC);
							if (topic instanceof String)
								theTopics = Arrays
										.asList(new String[] { (String) topic });
							else
								theTopics = Arrays.asList((String[]) topic);

							// Remove filtered topics
							theTopics = StringUtils.rightDifference(
									topicFilters, theTopics);

							
							final LeaseUpdateMessage lu = new LeaseUpdateMessage();
							lu.setType(LeaseUpdateMessage.TOPIC_UPDATE);
							lu.setServiceID(""); //$NON-NLS-1$
							lu.setPayload(new Object[] {
									theTopics.toArray(new String[theTopics
											.size()]), null });
							updateLeases(lu);

							return theTopics;
						}

						public void modifiedService(
								final ServiceReference reference,
								final Object oldTopics) {

							final List oldTopicList = (List) oldTopics;

							// https://bugs.eclipse.org/bugs/show_bug.cgi?id=326033
							Collection newTopicList;
							Object topic = reference
									.getProperty(EventConstants.EVENT_TOPIC);
							if (topic instanceof String)
								newTopicList = Arrays
										.asList(new String[] { (String) topic });
							else
								newTopicList = Arrays.asList((String[]) topic);
							
							// Remove filtered topics
							newTopicList = StringUtils.rightDifference(
									topicFilters, newTopicList);

							final Collection removed = CollectionUtils
									.rightDifference(newTopicList, oldTopicList);
							final Collection added = CollectionUtils
									.leftDifference(newTopicList, oldTopicList);
							final String[] addedTopics = (String[]) added
									.toArray(new String[removed.size()]);
							final String[] removedTopics = (String[]) removed
									.toArray(addedTopics);
							oldTopicList.removeAll(removed);
							oldTopicList.addAll(added);
							final LeaseUpdateMessage lu = new LeaseUpdateMessage();
							lu.setType(LeaseUpdateMessage.TOPIC_UPDATE);
							lu.setServiceID(""); //$NON-NLS-1$
							lu.setPayload(new Object[] { addedTopics,
									removedTopics });
							updateLeases(lu);
						}

						public void removedService(
								final ServiceReference reference,
								final Object oldTopics) {
							final List oldTopicsList = (List) oldTopics;
							final String[] removedTopics = (String[]) oldTopicsList
									.toArray(new String[oldTopicsList.size()]);
							final LeaseUpdateMessage lu = new LeaseUpdateMessage();
							lu.setType(LeaseUpdateMessage.TOPIC_UPDATE);
							lu.setServiceID(""); //$NON-NLS-1$
							lu.setPayload(new Object[] { null, removedTopics });
							updateLeases(lu);
						}
					});
			eventHandlerTracker.open();

			if (DEBUG) {
				log.log(LogService.LOG_DEBUG, "Local topic space " //$NON-NLS-1$
						+ Arrays.asList(getTopics()));
			}

			remoteServiceListenerTracker = new ServiceTracker(context,
					RemoteServiceListener.class.getName(), null);
			remoteServiceListenerTracker.open();

			serviceDiscoveryHandlerTracker = new ServiceTracker(context,
					ServiceDiscoveryHandler.class.getName(),
					new ServiceTrackerCustomizer() {

						public Object addingService(
								final ServiceReference reference) {
							// register all known services for discovery
							final ServiceDiscoveryHandler handler = (ServiceDiscoveryHandler) context
									.getService(reference);

							RemoteServiceRegistration[] regs = null;

							synchronized (serviceRegistrations) {
								regs = (RemoteServiceRegistration[]) serviceRegistrations.values()
										.toArray(new RemoteServiceRegistration[serviceRegistrations.size()]);
							}

							for (int i = 0; i < regs.length; i++) {
								handler
										.registerService(
												regs[i].getReference(),
												regs[i].getProperties(),
												URI
														.create("r-osgi://" //$NON-NLS-1$
																+ RemoteOSGiServiceImpl.MY_ADDRESS
																+ ":" //$NON-NLS-1$
																+ RemoteOSGiServiceImpl.R_OSGI_PORT
																+ "#" //$NON-NLS-1$
																+ regs[i]
																		.getServiceID()));
							}
							return handler;
						}

						public void modifiedService(
								final ServiceReference reference,
								final Object service) {

						}

						public void removedService(
								final ServiceReference reference,
								final Object service) {

						}

					});
			serviceDiscoveryHandlerTracker.open();

			remoteServiceTracker = new ServiceTracker(
					context,
					context.createFilter("(" //$NON-NLS-1$
							+ RemoteOSGiService.R_OSGi_REGISTRATION + "=*)"), new ServiceTrackerCustomizer() { //$NON-NLS-1$

						public Object addingService(
								final ServiceReference reference) {

							// FIXME: Surrogates have to be monitored
							// separately!!!
							final ServiceReference service = Arrays
									.asList(
											(String[]) reference
													.getProperty(Constants.OBJECTCLASS))
									.contains(
											SurrogateRegistration.class
													.getName()) ? (ServiceReference) reference
									.getProperty(SurrogateRegistration.SERVICE_REFERENCE)
									: reference;

							try {
								final RemoteServiceRegistration reg = new RemoteServiceRegistration(
										reference, service);

								if (log != null) {
									log.log(LogService.LOG_INFO, "REGISTERING " //$NON-NLS-1$
											+ reference
											+ " AS PROXIED SERVICES"); //$NON-NLS-1$
								}

								synchronized (serviceRegistrations) {
									serviceRegistrations.put(service, reg);
								}

								registerWithServiceDiscovery(reg);

								final LeaseUpdateMessage lu = new LeaseUpdateMessage();
								lu.setType(LeaseUpdateMessage.SERVICE_ADDED);
								lu.setServiceID(String.valueOf(reg
										.getServiceID()));
								lu.setPayload(new Object[] {
										reg.getInterfaceNames(),
										reg.getProperties() });
								updateLeases(lu);
								return service;
							} catch (final ClassNotFoundException e) {
								e.printStackTrace();
								throw new RemoteOSGiException(
										"Cannot find class " + service, e); //$NON-NLS-1$
							}
						}

						public void modifiedService(
								final ServiceReference reference,
								final Object service) {
							if (reference.getProperty(R_OSGi_REGISTRATION) == null) {
								removedService(reference, service);
								return;
							}
							final RemoteServiceRegistration reg = (RemoteServiceRegistration) serviceRegistrations
									.get(reference);

							registerWithServiceDiscovery(reg);

							final LeaseUpdateMessage lu = new LeaseUpdateMessage();
							lu.setType(LeaseUpdateMessage.SERVICE_MODIFIED);
							lu.setServiceID(String.valueOf(reg.getServiceID()));
							lu.setPayload(new Object[] { null,
									reg.getProperties() });
							updateLeases(lu);
						}

						public void removedService(
								final ServiceReference reference,
								final Object service) {

							final ServiceReference sref = Arrays
									.asList(
											(String[]) reference
													.getProperty(Constants.OBJECTCLASS))
									.contains(
											SurrogateRegistration.class
													.getName()) ? (ServiceReference) reference
									.getProperty(SurrogateRegistration.SERVICE_REFERENCE)
									: reference;

							final RemoteServiceRegistration reg = (RemoteServiceRegistration) serviceRegistrations
									.remove(sref);

							unregisterFromServiceDiscovery(reg);

							final LeaseUpdateMessage lu = new LeaseUpdateMessage();
							lu.setType(LeaseUpdateMessage.SERVICE_REMOVED);
							lu.setServiceID(String.valueOf(reg.getServiceID()));
							lu.setPayload(new Object[] { null, null });
							updateLeases(lu);
						}

					});
			remoteServiceTracker.open(true);

			networkChannelFactoryTracker = new ServiceTracker(
					context,
					context.createFilter("(" + Constants.OBJECTCLASS + "=" //$NON-NLS-1$ //$NON-NLS-2$
							+ NetworkChannelFactory.class.getName() + ")"), new ServiceTrackerCustomizer() { //$NON-NLS-1$

						public Object addingService(
								final ServiceReference reference) {
							final NetworkChannelFactory factory = (NetworkChannelFactory) context
									.getService(reference);
							try {
								factory.activate(RemoteOSGiServiceImpl.this);
							} catch (final IOException ioe) {
								if (log != null) {
									log.log(LogService.LOG_ERROR, ioe
											.getMessage(), ioe);
								}
							}
							return factory;
						}

						public void modifiedService(
								final ServiceReference reference,
								final Object factory) {
						}

						public void removedService(
								final ServiceReference reference,
								final Object factory) {
						}
					});
			networkChannelFactoryTracker.open();

		} catch (final InvalidSyntaxException ise) {
			ise.printStackTrace();
		}

	}

	private NetworkChannelFactory getNetworkChannelFactory(final String protocol)
			throws RemoteOSGiException {
		try {
			final Filter filter = RemoteOSGiActivator.getActivator()
					.getContext().createFilter(
							"(" //$NON-NLS-1$
									+ NetworkChannelFactory.PROTOCOL_PROPERTY
									+ "=" + protocol //$NON-NLS-1$
									+ ")"); //$NON-NLS-1$
			final ServiceReference[] refs = networkChannelFactoryTracker
					.getServiceReferences();

			if (refs != null) {
				for (int i = 0; i < refs.length; i++) {
					if (filter.match(refs[i])) {
						return (NetworkChannelFactory) networkChannelFactoryTracker
								.getService(refs[i]);
					}
				}
			}
			throw new RemoteOSGiException("No NetworkChannelFactory for " //$NON-NLS-1$
					+ protocol + " found."); //$NON-NLS-1$

		} catch (final InvalidSyntaxException e) {
			// does not happen
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * connect to a remote OSGi host.
	 * 
	 * @param uri
	 *            the uri of the remote OSGi peer.
	 * @return the array of service urls of services offered by the remote peer.
	 * @throws RemoteOSGiException
	 *             in case of errors.
	 * @throws IOException
	 *             in case of IO problems.
	 * @since 0.6
	 */
	public RemoteServiceReference[] connect(final URI uri)
			throws RemoteOSGiException, IOException {

		final URI endpoint = URI.create(getChannelURI(uri));
		final ChannelEndpointImpl test = (ChannelEndpointImpl) channels
				.get(endpoint.toString());
		if (test != null) {
			test.usageCounter++;
			return test.getAllRemoteReferences(null);
		}

		final ChannelEndpointImpl channel;
		final String protocol = endpoint.getScheme();

		final NetworkChannelFactory factory = getNetworkChannelFactory(protocol);
		channel = new ChannelEndpointImpl(factory, endpoint);

		return channel.sendLease(getServices(endpoint.getScheme()), getTopics());
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getListeningPort(java.lang.String)
	 * @category RemoteOSGiService
	 */
	public int getListeningPort(final String protocol)
			throws RemoteOSGiException {
		final NetworkChannelFactory factory = getNetworkChannelFactory(protocol);
		return factory.getListeningPort(protocol);
	}

	/**
	 * @param endpoint
	 * @throws RemoteOSGiException
	 * 
	 */
	public void disconnect(final URI endpoint) throws RemoteOSGiException {
		final String channelURI = getChannelURI(endpoint).toString();
		final ChannelEndpointImpl channel = (ChannelEndpointImpl) channels
				.get(channelURI);
		if (channel != null) {
			if (channel.usageCounter == 1) {
				channel.dispose();
				multiplexers.remove(channelURI);
			} else {
				channel.usageCounter--;
			}
		}
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getRemoteServiceReference(ch.ethz.iks.r_osgi.URI)
	 */
	public RemoteServiceReference getRemoteServiceReference(final URI serviceURI) {
		final String uri = getChannelURI(serviceURI);
		ChannelEndpointImpl channel = (ChannelEndpointImpl) channels
				.get(getChannelURI(serviceURI));
		if (channel == null) {
			try {
				connect(serviceURI);
				channel = (ChannelEndpointImpl) channels.get(uri);
			} catch (final IOException ioe) {
				throw new RemoteOSGiException("Cannot connect to " + uri); //$NON-NLS-1$
			}
		}
		return channel.getRemoteReference(serviceURI.toString());
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getRemoteServiceReferences(ch.ethz.iks.r_osgi.URI,
	 *      java.lang.String, org.osgi.framework.Filter)
	 */
	public RemoteServiceReference[] getRemoteServiceReferences(
			final URI service, final String clazz, final Filter filter) {
		final String uri = getChannelURI(service);
		ChannelEndpointImpl channel = (ChannelEndpointImpl) channels.get(uri);
		if (channel == null) {
			try {
				connect(service);
				channel = (ChannelEndpointImpl) channels.get(uri);
			} catch (final IOException ioe) {
				throw new RemoteOSGiException("Cannot connect to " + uri); //$NON-NLS-1$
			}
		}
		if (clazz == null) {
			return channel.getAllRemoteReferences(null);
		}
		try {
			return channel.getAllRemoteReferences(RemoteOSGiActivator
					.getActivator().getContext()
					.createFilter(filter != null ? "(&" + filter + "(" //$NON-NLS-1$ //$NON-NLS-2$
							+ Constants.OBJECTCLASS + "=" + clazz + "))" : "(" //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
							+ Constants.OBJECTCLASS + "=" + clazz + ")")); //$NON-NLS-1$ //$NON-NLS-2$
		} catch (final InvalidSyntaxException ise) {
			ise.printStackTrace();
			return null;
		}
	}

	/**
	 * 
	 * @param ref
	 *            the <code>RemoteServiceReference</code>.
	 * @return the service object or <code>null</code> if the service is not
	 *         (yet) present.
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getRemoteService(ch.ethz.iks.r_osgi.RemoteServiceReference)
	 * @category RemoteOSGiService
	 * @since 0.6
	 */
	public Object getRemoteService(final RemoteServiceReference ref) {
		if (ref == null) {
			throw new IllegalArgumentException("Remote Reference is null."); //$NON-NLS-1$
		}
		ServiceReference sref = getFetchedServiceReference(ref);
		if (sref == null) {
			fetchService(ref);
			sref = getFetchedServiceReference(ref);
		}
		return sref == null ? null : RemoteOSGiActivator.getActivator()
				.getContext().getService(sref);
	}

	/**
	 * 
	 * @throws InterruptedException
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getRemoteServiceBundle(ch.ethz.iks.r_osgi.RemoteServiceReference,
	 *      int)
	 * @since 1.0.0.RC4
	 */
	public Object getRemoteServiceBundle(final RemoteServiceReference ref,
			final int timeout) throws InterruptedException {
		if (ref == null) {
			throw new IllegalArgumentException("Remote Reference is null."); //$NON-NLS-1$
		}
		getChannel(ref.getURI()).getCloneBundle(ref);
		if (timeout < 0) {
			return null;
		}
		// TODO: FIXME use at least all service interfaces
		final ServiceTracker tracker = new ServiceTracker(RemoteOSGiActivator
				.getActivator().getContext(), ref.getServiceInterfaces()[0],
				null);
		tracker.open();
		tracker.waitForService(0);
		// TODO: FIXME compare that the service is actually the one from the
		// fetched
		// bundle!
		return tracker.getService();
	}

	/**
	 * 
	 * @param ref
	 *            the <code>RemoteServiceReference</code> to the service.
	 * @return the service reference of the service (or service proxy) or
	 *         <code>null</code> if the service is not (yet) present.
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getFetchedServiceReference(ch.ethz.iks.r_osgi.RemoteServiceReference)
	 * @category RemoteOSGiService
	 * @since 0.6
	 */
	private ServiceReference getFetchedServiceReference(
			final RemoteServiceReference ref) {
		try {
			final ServiceReference[] refs = RemoteOSGiActivator.getActivator()
					.getContext().getServiceReferences(
							ref.getServiceInterfaces()[0], "(" //$NON-NLS-1$
									+ SERVICE_URI + "=" + ref.getURI() + ")"); //$NON-NLS-1$ //$NON-NLS-2$
			if (refs != null) {
				return refs[0];
			}
		} catch (final InvalidSyntaxException doesNotHappen) {
			doesNotHappen.printStackTrace();
		}
		return null;
	}

	/**
	 * fetch the discovered remote service. The service will be fetched from the
	 * service providing host and a proxy bundle is registered with the local
	 * framework.
	 * 
	 * @param service
	 *            the <code>ServiceURL</code>.
	 * @throws RemoteOSGiException
	 *             if the fetching fails.
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getProxyBundle(ch.ethz.iks.slp.ServiceURL)
	 * @since 0.1
	 * @category RemoteOSGiService
	 */
	private void fetchService(final RemoteServiceReference ref)
			throws RemoteOSGiException {
		try {
			ChannelEndpointImpl channel;
			channel = ((RemoteServiceReferenceImpl) ref).getChannel();
			channel.getProxyBundle(ref);
		} catch (final UnknownHostException e) {
			throw new RemoteOSGiException(
					"Cannot resolve host " + ref.getURI(), e); //$NON-NLS-1$
		} catch (final IOException ioe) {
			throw new RemoteOSGiException("Proxy generation error", ioe); //$NON-NLS-1$
		}
	}

	static ChannelEndpointImpl getChannel(final URI uri) {
		return (ChannelEndpointImpl) channels.get(getChannelURI(uri));
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.Remoting#getEndpoint(java.lang.String)
	 * @category Remoting
	 */
	public ChannelEndpoint getEndpoint(final String uri) {
		return getMultiplexer(uri);
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#getEndpointManager(ch.ethz.iks.r_osgi.URI)
	 * @category Remoting
	 */
	public ChannelEndpointManager getEndpointManager(
			final URI remoteEndpointAddress) {
		return getMultiplexer(remoteEndpointAddress.toString());
	}

	/**
	 * 
	 * @param uri
	 * @return
	 */
	private ChannelEndpointMultiplexer getMultiplexer(final String uri) {
		final String channel = getChannelURI(URI.create(uri));
		ChannelEndpointMultiplexer multiplexer = (ChannelEndpointMultiplexer) multiplexers
				.get(channel);
		if (multiplexer == null || !multiplexer.isConnected()) {
			multiplexer = new ChannelEndpointMultiplexer(
					(ChannelEndpointImpl) channels.get(channel));
			multiplexers.put(channel, multiplexer);
		}
		return multiplexer;
	}

	/**
	 * 
	 * @param serviceURI
	 * @return
	 */
	private static String getChannelURI(final URI serviceURI) {
		return URI.create(
				serviceURI.getScheme() + "://" + serviceURI.getHost() + ":" //$NON-NLS-1$ //$NON-NLS-2$
						+ serviceURI.getPort()).toString();
	}

	/**
	 * the method is called when the R-OSGi bundle is about to be stopped.
	 * removes all registered proxy bundles.
	 */
	void cleanup() {
		final ChannelEndpoint[] c = (ChannelEndpoint[]) channels.values()
				.toArray(new ChannelEndpoint[channels.size()]);
		channels.clear();
		for (int i = 0; i < c.length; i++) {
			c[i].dispose();
		}
		final Object[] factories = networkChannelFactoryTracker.getServices();
		if (factories != null) {
			for (int i = 0; i < factories.length; i++) {
				try {
					((NetworkChannelFactory) factories[i]).deactivate(this);
				} catch (final IOException ioe) {
					if (log != null) {
						log.log(LogService.LOG_ERROR, ioe.getMessage(), ioe);
					}
				}
			}
		}
		eventAdminTracker.close();
		remoteServiceTracker.close();
		serviceDiscoveryHandlerTracker.close();
		remoteServiceListenerTracker.close();
		networkChannelFactoryTracker.close();
	}

	/**
	 * get all provided (remote-enabled) services of this peer.
	 * 
	 * @return return the services.
	 */
	static RemoteServiceRegistration[] getServices(String transport) {
		List results = new ArrayList();
		synchronized (serviceRegistrations) {
			for (Iterator i = serviceRegistrations.keySet().iterator(); i.hasNext();) {
				ServiceReference ref = (ServiceReference) i.next();
				Object propVal = ref.getProperty(RemoteOSGiService.R_OSGi_REGISTRATION);
				if (transport == null || propVal == null || propVal.equals(Boolean.TRUE) || propVal.equals(transport))
					results.add(serviceRegistrations.get(ref));
			}
		}
		return (RemoteServiceRegistration[]) results.toArray(new RemoteServiceRegistration[results.size()]);
	}

	/**
	 * 
	 * @param serviceID
	 * @return
	 */
	static RemoteServiceRegistration getServiceRegistration(
			final String serviceID) {

		final String filter = "".equals(serviceID) ? null : '(' //$NON-NLS-1$
				+ Constants.SERVICE_ID + "=" + serviceID + ")"; //$NON-NLS-1$ //$NON-NLS-2$

		try {
			final ServiceReference[] refs = RemoteOSGiActivator.getActivator()
					.getContext().getAllServiceReferences((String) null, filter);
			if (refs == null) {
				if (log != null) {
					log.log(LogService.LOG_WARNING, "COULD NOT FIND " + filter); //$NON-NLS-1$
				}
				return null;
			}
			synchronized (serviceRegistrations) {
				return (RemoteServiceRegistration) serviceRegistrations.get(refs[0]);
			}
		} catch (final InvalidSyntaxException e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * get all topics for which event handlers are registered.
	 * 
	 * @return the topics.
	 */
	static String[] getTopics() {
		final Object[] topicLists = eventHandlerTracker.getServices();
		final List topics = new ArrayList();
		if (topicLists != null) {
			for (int i = 0; i < topicLists.length; i++) {
				topics.addAll((List) topicLists[i]);
			}
		}
		return (String[]) topics.toArray(new String[topics.size()]);
	}

	/**
	 * get the next transaction id.
	 * 
	 * @return the next xid.
	 */
	static synchronized int nextXid() {
		if (nextXid == -1) {
			nextXid = 0;
		}
		return (++nextXid);
	}

	/**
	 * register a channel.
	 * 
	 * @param channel
	 *            the local endpoint of the channel.
	 */
	static void registerChannelEndpoint(final ChannelEndpoint channel) {
		channels.put(channel.getRemoteAddress().toString(), channel);
	}

	/**
	 * unregister a channel.
	 * 
	 * @param channel
	 *            the local endpoint of the channel.
	 */
	static void unregisterChannelEndpoint(final String channelURI) {
		channels.remove(channelURI);
	}

	/**
	 * update the leases.
	 */
	static void updateLeases(final LeaseUpdateMessage msg) {
		final ChannelEndpointImpl[] endpoints = (ChannelEndpointImpl[]) channels
				.values().toArray(new ChannelEndpointImpl[channels.size()]);
		for (int i = 0; i < endpoints.length; i++) {
			endpoints[i].sendLeaseUpdate(msg);
		}
	}

	/**
	 * 
	 * @param event
	 */
	static void notifyRemoteServiceListeners(final RemoteServiceEvent event) {
		final ServiceReference[] refs = remoteServiceListenerTracker
				.getServiceReferences();
		if (refs == null) {
			return;
		}
		final Set serviceIfaces = new HashSet(Arrays.asList(event
				.getRemoteReference().getServiceInterfaces()));
		for (int i = 0; i < refs.length; i++) {
			final String[] ifaces = (String[]) refs[i]
					.getProperty(RemoteServiceListener.SERVICE_INTERFACES);
			if (ifaces == null) {
				match(refs[i], event);
			} else {
				for (int j = 0; j < ifaces.length; j++) {
					if (serviceIfaces.contains(ifaces[j])) {
						match(refs[i], event);
						break;
					}
				}
			}
		}
	}

	/**
	 * 
	 * @param ref
	 * @param event
	 */
	private static void match(final ServiceReference ref,
			final RemoteServiceEvent event) {
		final Filter filter = (Filter) ref
				.getProperty(RemoteServiceListener.FILTER);
		if (filter == null
				|| filter.match(((RemoteServiceReferenceImpl) event
						.getRemoteReference()).getProperties())) {
			final RemoteServiceListener listener = (RemoteServiceListener) remoteServiceListenerTracker
					.getService(ref);
			if (listener != null) {
				listener.remoteServiceEvent(event);
			}
		}
	}

	/**
	 * register a service with the remote service discovery layer.
	 */
	void registerWithServiceDiscovery(final RemoteServiceRegistration reg) {
		// register the service with all service
		// discovery handler
		final Object[] handler = serviceDiscoveryHandlerTracker.getServices();

		if (handler != null) {
			for (int i = 0; i < handler.length; i++) {
				final Dictionary props = reg.getProperties();
				((ServiceDiscoveryHandler) handler[i]).registerService(reg
						.getReference(), props, URI.create("r-osgi://" //$NON-NLS-1$
						+ RemoteOSGiServiceImpl.MY_ADDRESS + ":" //$NON-NLS-1$
						+ RemoteOSGiServiceImpl.R_OSGI_PORT + "#" //$NON-NLS-1$
						+ reg.getServiceID()));
			}
		}

	}

	/**
	 * unregister a service from the service discovery layer.
	 * 
	 * @param reg
	 *            the remote service registration.
	 */
	void unregisterFromServiceDiscovery(final RemoteServiceRegistration reg) {
		final Object[] handler = serviceDiscoveryHandlerTracker.getServices();
		if (handler != null) {
			for (int i = 0; i < handler.length; i++) {
				((ServiceDiscoveryHandler) handler[i]).unregisterService(reg
						.getReference());
			}
		}
	}

	static byte[] getBundle(final Bundle bundle) throws IOException {
		final byte[] buffer = new byte[BUFFER_SIZE];
		final CRC32 crc = getEntry == null ? null : new CRC32();
		final ByteArrayOutputStream out = getEntry == null ? new ByteArrayOutputStream(
				BUFFER_SIZE)
				: null;

		if (getEntry == null) {
			return getBundleConcierge(bundle, buffer, out);
		}

		try {
			// workaround for Eclipse
			final String prefix = getEntry.invoke(bundle,
					new Object[] { pkgAdmin.getExportedPackages(bundle)[0]
							.getName().replace('.', '/') }) == null ? "/bin" //$NON-NLS-1$
					: ""; //$NON-NLS-1$

			return generateBundle(bundle, prefix, buffer, crc);
		} catch (Exception e) {
			e.printStackTrace();
			throw new IOException(e.getMessage());
		}
	}

	static byte[][] getBundlesForPackages(final String[] packages)
			throws IOException {
		final HashSet visitedBundles = new HashSet(packages.length);
		final ArrayList bundleBytes = new ArrayList(packages.length);

		// we reuse the buffer and other objects during the iteration over
		// bundles and entries to improve
		// the performance on smaller VMs.
		final byte[] buffer = new byte[BUFFER_SIZE];
		final CRC32 crc = getEntry == null ? null : new CRC32();
		final ByteArrayOutputStream out = getEntry == null ? new ByteArrayOutputStream(
				BUFFER_SIZE)
				: null;

		// TODO: for R4, handle multiple versions
		for (int i = 0; i < packages.length; i++) {
			final Bundle bundle = pkgAdmin.getExportedPackage(packages[i])
					.getExportingBundle();
			if (visitedBundles.contains(bundle)) {
				continue;
			}
			visitedBundles.add(bundle);

			if (getEntry == null) {
				bundleBytes.add(getBundleConcierge(bundle, buffer, out));
			} else {

				// workaround for Eclipse
				try {
					final String prefix = getEntry.invoke(bundle,
							new Object[] { packages[i].replace('.', '/') }) == null ? "/bin" //$NON-NLS-1$
							: ""; //$NON-NLS-1$
					bundleBytes
							.add(generateBundle(bundle, prefix, buffer, crc));
				} catch (Exception e) {
					e.printStackTrace();
					throw new IOException(e.getMessage());
				}
			}

		}
		return (byte[][]) bundleBytes.toArray(new byte[bundleBytes.size()][]);
	}

	private static byte[] getBundleConcierge(final Bundle bundle,
			final byte[] buffer, final ByteArrayOutputStream out)
			throws IOException {
		FileInputStream in = new FileInputStream(new File(base, bundle
				.getBundleId()
				+ "/bundle"));
		out.reset();

		int read;
		while ((read = in.read(buffer, 0, 2048)) > -1) {
			out.write(buffer, 0, read);
		}

		return out.toByteArray();
	}

	static boolean checkPackageImport(final String pkg) {
		// TODO: use versions if on R4
		if (pkg.startsWith("org.osgi")) {
			return true;
		}
		return pkgAdmin.getExportedPackage(pkg) != null;
	}

	private static byte[] generateBundle(final Bundle bundle,
			final String prefix, final byte[] buffer, final CRC32 crc)
			throws Exception {

		final URL url = (URL) getEntry.invoke(bundle,
				new Object[] { SEPARATOR_CHAR + MANIFEST_FILE_NAME });
		final Manifest mf = new Manifest();
		mf.read(url.openStream());

		final ByteArrayOutputStream bout = new ByteArrayOutputStream();
		final JarOutputStream out = new JarOutputStream(bout, mf);

		scan(bundle, prefix, "", out, buffer, crc); //$NON-NLS-1$

		out.flush();
		out.finish();
		out.close();
		return bout.toByteArray();
	}

	private static void scan(final Bundle bundle, final String prefix,
			final String path, final JarOutputStream out, final byte[] buffer,
			final CRC32 crc) throws Exception {

		final Enumeration e = (Enumeration) getEntryPaths.invoke(bundle,
				new Object[] { prefix + SEPARATOR_CHAR + path });
		if (e == null) {
			return;
		}
		while (e.hasMoreElements()) {
			final String entry = ((String) e.nextElement()).substring(prefix
					.length());
			if (entry.equals(MANIFEST_FILE_NAME)) {
				continue;
			} else if (entry.endsWith(SEPARATOR_CHAR)) {
				scan(bundle, prefix, entry, out, buffer, crc);
			} else {
				final URL url = bundle.getResource(prefix + "/" + entry);
				final InputStream in = url.openStream();
				int read;
				int totallyRead = 0;

				final JarEntry jarEntry = new JarEntry(entry);
				out.putNextEntry(jarEntry);
				crc.reset();

				while ((read = in.read(buffer, 0, 2048)) > -1) {
					totallyRead += read;
					out.write(buffer, 0, read);
					crc.update(buffer, 0, read);
				}

				jarEntry.setSize(totallyRead);
				jarEntry.setCrc(crc.getValue());
				out.flush();
				out.closeEntry();
			}
		}
	} 

	/**
	 * 
	 * 
	 * @see ch.ethz.iks.r_osgi.Remoting#createEndpoint(ch.ethz.iks.r_osgi.channels.NetworkChannel)
	 * @category RemoteOSGiService
	 */
	public void createEndpoint(final NetworkChannel channel) {
		new ChannelEndpointImpl(channel);
	}

	/**
	 * 
	 * @see ch.ethz.iks.r_osgi.RemoteOSGiService#ungetRemoteService(ch.ethz.iks.r_osgi.RemoteServiceReference)
	 * @category RemoteOSGiService
	 */
	public void ungetRemoteService(
			final RemoteServiceReference remoteServiceReference) {
		((RemoteServiceReferenceImpl) remoteServiceReference).getChannel()
				.ungetRemoteService(remoteServiceReference.getURI());

	}

	public void asyncRemoteCall(URI service, String methodSignature,
			Object[] args, AsyncRemoteCallCallback callback) {
		final ChannelEndpointImpl endpoint = getChannel(service);
		endpoint.asyncRemoteCall(service.getFragment(), methodSignature, args,
				callback);
	}

}
