blob: 0558cdbfd53c9b6448fd66fc73ba46ceeb23e370 [file] [log] [blame]
/*
* Copyright (c) 2008-2012, 2015 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.spi.net4j;
import org.eclipse.net4j.ITransportConfig;
import org.eclipse.net4j.Net4jUtil;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.channel.ChannelException;
import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.protocol.IProtocol2;
import org.eclipse.net4j.protocol.IProtocolProvider;
import org.eclipse.net4j.protocol.ProtocolVersionException;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.StringUtil;
import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.factory.FactoryKey;
import org.eclipse.net4j.util.factory.IFactoryKey;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.security.INegotiationContext;
import org.eclipse.internal.net4j.TransportConfig;
import org.eclipse.internal.net4j.bundle.OM;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* If the meaning of this type isn't clear, there really should be more of a description here...
*
* @author Eike Stepper
* @since 2.0
*/
public abstract class ChannelMultiplexer extends Container<IChannel>implements InternalChannelMultiplexer
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class);
private ITransportConfig config;
private long openChannelTimeout = IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT;
private ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>();
@ExcludeFromDump
private transient Set<Short> channelIDs = new HashSet<Short>();
@ExcludeFromDump
private transient int lastChannelID;
public ChannelMultiplexer()
{
}
public synchronized ITransportConfig getConfig()
{
if (config == null)
{
config = new TransportConfig(this);
}
return config;
}
public synchronized void setConfig(ITransportConfig config)
{
checkInactive();
this.config = Net4jUtil.copyTransportConfig(this, config);
}
public long getOpenChannelTimeout()
{
if (openChannelTimeout == IChannelMultiplexer.DEFAULT_OPEN_CHANNEL_TIMEOUT)
{
return OM.BUNDLE.getDebugSupport().getDebugOption("open.channel.timeout", 10000); //$NON-NLS-1$
}
return openChannelTimeout;
}
public void setOpenChannelTimeout(long openChannelTimeout)
{
this.openChannelTimeout = openChannelTimeout;
}
public final InternalChannel getChannel(short channelID)
{
return (InternalChannel)channels.get(channelID);
}
public final Collection<IChannel> getChannels()
{
return channels.values();
}
@Override
public boolean isEmpty()
{
return channels.isEmpty();
}
public IChannel[] getElements()
{
List<IChannel> list = new ArrayList<IChannel>(getChannels());
return list.toArray(new IChannel[list.size()]);
}
public InternalChannel openChannel() throws ChannelException
{
return openChannel((IProtocol<?>)null);
}
public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException
{
IProtocol<?> protocol = createProtocol(protocolID, infraStructure);
if (protocol == null)
{
throw new IllegalArgumentException("Unknown protocolID: " + protocolID); //$NON-NLS-1$
}
return openChannel(protocol);
}
public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException
{
long start = System.currentTimeMillis();
doBeforeOpenChannel(protocol);
InternalChannel channel = createChannel();
initChannel(channel, protocol);
short channelID = getNextChannelID();
channel.setID(channelID);
addChannel(channel);
try
{
try
{
long timeout = getOpenChannelTimeout() - System.currentTimeMillis() + start;
if (timeout <= 0)
{
throw new TimeoutRuntimeException();
}
registerChannelWithPeer(channelID, timeout, protocol);
}
catch (TimeoutRuntimeException ex)
{
// Adjust the message for the complete timeout time
String message = "Channel registration timeout after " + getOpenChannelTimeout() + " milliseconds";
throw new TimeoutRuntimeException(message, ex);
}
}
catch (ChannelException ex)
{
throw ex;
}
catch (Exception ex)
{
throw new ChannelException(ex);
}
return channel;
}
/**
* @deprecated Use {@link #inverseOpenChannel(short, String, int)}.
*/
@Deprecated
public InternalChannel inverseOpenChannel(short channelID, String protocolID)
{
return inverseOpenChannel(channelID, protocolID, IProtocol2.UNSPECIFIED_VERSION);
}
/**
* @since 4.2
*/
public InternalChannel inverseOpenChannel(short channelID, String protocolID, int protocolVersion)
{
IProtocol<?> protocol = createProtocol(protocolID, null);
ProtocolVersionException.checkVersion(protocol, protocolVersion);
InternalChannel channel = createChannel();
initChannel(channel, protocol);
channel.setID(channelID);
addChannel(channel);
return channel;
}
public void closeChannel(InternalChannel channel) throws ChannelException
{
InternalChannel internalChannel = channel;
deregisterChannelFromPeer(internalChannel);
removeChannel(internalChannel);
}
public void inverseCloseChannel(short channelID) throws ChannelException
{
InternalChannel channel = getChannel(channelID);
LifecycleUtil.deactivate(channel);
}
protected InternalChannel createChannel()
{
return new Channel();
}
protected void initChannel(InternalChannel channel, IProtocol<?> protocol)
{
channel.setMultiplexer(this);
channel.setReceiveExecutor(getConfig().getReceiveExecutor());
if (protocol != null)
{
protocol.setChannel(channel);
LifecycleUtil.activate(protocol);
if (TRACER.isEnabled())
{
String protocolType = protocol.getType();
TRACER.format("Opening channel with protocol {0}", protocolType); //$NON-NLS-1$
}
channel.setReceiveHandler(protocol);
}
else
{
if (TRACER.isEnabled())
{
TRACER.trace("Opening channel without protocol"); //$NON-NLS-1$
}
}
}
@SuppressWarnings("unchecked")
protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure)
{
if (StringUtil.isEmpty(type))
{
return null;
}
IProtocolProvider protocolProvider = getConfig().getProtocolProvider();
if (protocolProvider == null)
{
throw new ChannelException("No protocol provider configured"); //$NON-NLS-1$
}
IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type);
if (protocol == null)
{
throw new ChannelException("Invalid protocol factory: " + type); //$NON-NLS-1$
}
if (infraStructure != null)
{
protocol.setInfraStructure(infraStructure);
}
return protocol;
}
protected IFactoryKey createProtocolFactoryKey(String type)
{
switch (getLocation())
{
case SERVER:
return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type);
case CLIENT:
return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type);
default:
throw new IllegalStateException();
}
}
protected void doBeforeOpenChannel(IProtocol<?> protocol)
{
// Do nothing
}
@Override
protected void doDeactivate() throws Exception
{
IChannel[] channels;
synchronized (channelIDs)
{
channels = getElements();
}
for (IChannel channel : channels)
{
LifecycleUtil.deactivate(channel);
}
synchronized (channelIDs)
{
this.channels.clear();
}
super.doDeactivate();
}
protected abstract INegotiationContext createNegotiationContext();
protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol)
throws ChannelException;
protected abstract void deregisterChannelFromPeer(InternalChannel channel) throws ChannelException;
private short getNextChannelID()
{
synchronized (channelIDs)
{
int start = lastChannelID;
int maxValue = Short.MAX_VALUE;
for (;;)
{
++lastChannelID;
if (lastChannelID == start)
{
throw new ChannelException("Too many channels"); //$NON-NLS-1$
}
if (lastChannelID > maxValue)
{
lastChannelID = 1;
}
short id = (short)(isClient() ? lastChannelID : -lastChannelID);
if (channelIDs.add(id))
{
return id;
}
}
}
}
private void addChannel(InternalChannel channel)
{
short channelID = channel.getID();
if (channelID == RESERVED_CHANNEL || channelID == IBuffer.NO_CHANNEL)
{
throw new ChannelException("Invalid channel ID: " + channelID); //$NON-NLS-1$
}
channels.put(channelID, channel);
LifecycleUtil.activate(channel);
fireElementAddedEvent(channel);
}
private void removeChannel(InternalChannel channel)
{
try
{
short channelID = channel.getID();
boolean removed;
synchronized (channelIDs)
{
removed = channels.remove(channelID) != null;
if (removed)
{
channelIDs.remove(channelID);
}
}
if (removed)
{
fireElementRemovedEvent(channel);
}
}
catch (RuntimeException ex)
{
OM.LOG.error(ex);
throw ex;
}
}
}