blob: 4624ce2dda6cd595cd0ad2b3ef10d3b63e6887bc [file] [log] [blame]
/* Copyright (c) 2006-2009 Jan S. Rellermeyer
* Systems Group,
* Department of Computer Science, ETH Zurich.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* - Neither the name of ETH Zurich nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package ch.ethz.iks.r_osgi.impl;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.log.LogService;
import ch.ethz.iks.r_osgi.RemoteOSGiException;
import ch.ethz.iks.r_osgi.URI;
import ch.ethz.iks.r_osgi.channels.ChannelEndpoint;
import ch.ethz.iks.r_osgi.channels.ChannelEndpointManager;
import ch.ethz.iks.r_osgi.messages.RemoteOSGiMessage;
import ch.ethz.iks.r_osgi.types.Timestamp;
/**
* Channel endpoint multiplexer. <i>EXPERIMENTAL</i>
*
* @author Jan S. Rellermeyer, ETH Zurich
*/
final class ChannelEndpointMultiplexer implements ChannelEndpoint,
ChannelEndpointManager {
/**
* the primary channel.
*/
ChannelEndpointImpl primary;
/**
* the policies.
*/
private final HashMap policies = new HashMap(0);
/**
* the service registration.
*/
private ServiceRegistration reg;
/**
* the mappings.
*/
private final Map mappings = new HashMap();
/**
* create a new channel endpoint multiplexer.
*
* @param primary
* the primary channel endpoint.
*/
ChannelEndpointMultiplexer(final ChannelEndpointImpl primary) {
if (primary == null) {
throw new IllegalArgumentException(
"Multiplexer must not be constructed from NULL primary endpoint"); //$NON-NLS-1$
}
this.primary = primary;
}
/**
* dispose the multiplexer.
*/
public void dispose() {
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#getPresentationProperties(java.lang.String)
*/
public Dictionary getPresentationProperties(final String service) {
return primary.getPresentationProperties(service);
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#getProperties(java.lang.String)
*/
public Dictionary getProperties(final String service) {
return primary.getProperties(service);
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#getRemoteAddress()
*/
public URI getRemoteAddress() {
return primary.getRemoteAddress();
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#invokeMethod(java.lang.String,
* java.lang.String, java.lang.Object[])
*/
public Object invokeMethod(final String serviceURI,
final String methodSignature, final Object[] args) throws Throwable {
final Mapping mapping = (Mapping) mappings.get(serviceURI);
if (mapping == null) {
return primary.invokeMethod(serviceURI, methodSignature, args);
} else {
final Integer p = (Integer) policies.get(serviceURI);
if (p == null) {
return primary.invokeMethod(mapping.getMapped(primary),
methodSignature, args);
} else {
final int policy = p.intValue();
if (policy == LOADBALANCING_ANY_POLICY) {
final ChannelEndpoint endpoint = mapping.getAny();
try {
return endpoint.invokeMethod(mapping
.getMapped(endpoint), methodSignature, args);
} catch (final RemoteOSGiException e) {
final ChannelEndpointImpl next = mapping.getNext();
if (next != null) {
primary.untrackRegistration(serviceURI);
primary = next;
primary.trackRegistration(serviceURI, reg);
if (RemoteOSGiServiceImpl.DEBUG) {
RemoteOSGiServiceImpl.log.log(
LogService.LOG_INFO,
"DOING FAILOVER TO " //$NON-NLS-1$
+ primary.getRemoteAddress());
}
return primary.invokeMethod(mapping
.getMapped(primary), methodSignature, args);
}
dispose();
throw e;
}
} else {
try {
if (!primary.isConnected()) {
throw new RemoteOSGiException("channel went down"); //$NON-NLS-1$
}
return primary.invokeMethod(mapping.getMapped(primary),
methodSignature, args);
} catch (final RemoteOSGiException e) {
if (policy == FAILOVER_REDUNDANCY_POLICY) {
// do the failover
final ChannelEndpointImpl next = mapping.getNext();
if (next != null) {
primary.untrackRegistration(serviceURI);
primary = next;
primary.trackRegistration(serviceURI, reg);
if (RemoteOSGiServiceImpl.DEBUG) {
RemoteOSGiServiceImpl.log
.log(
LogService.LOG_INFO,
"DOING FAILOVER TO " //$NON-NLS-1$
+ primary
.getRemoteAddress());
}
return primary.invokeMethod(mapping
.getMapped(primary), methodSignature,
args);
}
}
dispose();
throw e;
}
}
}
}
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#receivedMessage(ch.ethz.iks.r_osgi.messages.RemoteOSGiMessage)
*/
public void receivedMessage(final RemoteOSGiMessage msg) {
throw new IllegalArgumentException(
"Not supported through endpoint multiplexer"); //$NON-NLS-1$
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#trackRegistration(java.lang.String,
* org.osgi.framework.ServiceRegistration)
*/
public void trackRegistration(final String service,
final ServiceRegistration sreg) {
reg = sreg;
primary.trackRegistration(service, sreg);
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpoint#untrackRegistration(java.lang.String)
*/
public void untrackRegistration(final String service) {
primary.untrackRegistration(service);
}
public boolean isConnected() {
return primary.isConnected();
}
/**
* Mapping.
*
*/
private class Mapping {
private final Random random = new Random(System.currentTimeMillis());
private final List redundant = new ArrayList(0);
private final Map uriMapping = new HashMap(0);
Mapping(final String serviceURI) {
uriMapping.put(primary, serviceURI);
}
void addRedundant(final String redundantServiceURI,
final ChannelEndpoint endpoint) {
redundant.add(endpoint);
uriMapping.put(endpoint, redundantServiceURI);
}
void removeRedundant(final ChannelEndpoint endpoint) {
redundant.remove(endpoint);
uriMapping.remove(endpoint);
}
String getMapped(final ChannelEndpoint endpoint) {
return (String) uriMapping.get(endpoint);
}
ChannelEndpointImpl getNext() {
return (ChannelEndpointImpl) redundant.remove(0);
}
boolean isEmpty() {
return redundant.size() == 0;
}
ChannelEndpoint getAny() {
final int ran = random.nextInt(redundant.size() + 1);
if (ran == 0) {
return primary;
} else {
return (ChannelEndpoint) redundant.get(ran - 1);
}
}
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpointManager#addRedundantEndpoint(ch.ethz.iks.r_osgi.URI,
* ch.ethz.iks.r_osgi.URI)
*/
public void addRedundantEndpoint(final URI service,
final URI redundantService) {
final ChannelEndpoint redundantEndpoint = RemoteOSGiServiceImpl
.getChannel(redundantService);
primary.hasRedundantLinks = true;
Mapping mapping = (Mapping) mappings.get(service);
if (mapping == null) {
mapping = new Mapping(service.toString());
mappings.put(service.toString(), mapping);
}
mapping.addRedundant(redundantService.toString(), redundantEndpoint);
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpointManager#getLocalAddress()
*/
public URI getLocalAddress() {
return primary.getLocalAddress();
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpointManager#removeRedundantEndpoint(ch.ethz.iks.r_osgi.URI,
* ch.ethz.iks.r_osgi.URI)
*/
public void removeRedundantEndpoint(final URI service,
final URI redundantService) {
final ChannelEndpoint redundantEndpoint = RemoteOSGiServiceImpl
.getChannel(redundantService);
final Mapping mapping = (Mapping) mappings.get(service.toString());
mapping.removeRedundant(redundantEndpoint);
if (mapping.isEmpty()) {
mappings.remove(service);
primary.hasRedundantLinks = false;
}
}
/**
*
* @see ch.ethz.iks.r_osgi.channels.ChannelEndpointManager#setEndpointPolicy(ch.ethz.iks.r_osgi.URI,
* int)
*/
public void setEndpointPolicy(final URI service, final int policy) {
policies.put(service.toString(), Integer.valueOf(policy));
}
/**
* transform a timestamp into the peer's local time.
*
* @param timestamp
* the Timestamp.
* @return the transformed timestamp.
* @throws RemoteOSGiException
* if the transformation fails.
* @since 0.2
*/
public Timestamp transformTimestamp(final Timestamp timestamp)
throws RemoteOSGiException {
return primary.getOffset().transform(timestamp);
}
}