blob: 08a92909b4d33a77b7f3b84e5b748a5280e28da6 [file] [log] [blame]
/***************************************************************************
* Copyright (c) 2004 - 2008 Eike Stepper, Germany.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
**************************************************************************/
package org.eclipse.spi.net4j;
import org.eclipse.net4j.buffer.BufferState;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.buffer.IBufferHandler;
import org.eclipse.net4j.channel.IChannelMultiplexer;
import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump;
import org.eclipse.net4j.util.concurrent.IWorkSerializer;
import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer;
import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer;
import org.eclipse.net4j.util.lifecycle.Lifecycle;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
import java.text.MessageFormat;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
/**
* @author Eike Stepper
* @since 2.0
*/
public class Channel extends Lifecycle implements InternalChannel
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CHANNEL, Channel.class);
private String userID;
private InternalChannelMultiplexer channelMultiplexer;
private short id = IBuffer.NO_CHANNEL;
private ExecutorService receiveExecutor;
/**
* The external handler for buffers passed from the {@link #connector}.
*/
private IBufferHandler receiveHandler;
private IWorkSerializer receiveSerializer;
private transient Queue<IBuffer> sendQueue;
@ExcludeFromDump
private transient boolean inverseClosed;
public Channel()
{
}
public String getUserID()
{
return userID;
}
public void setUserID(String userID)
{
this.userID = userID;
}
public Location getLocation()
{
return channelMultiplexer.getLocation();
}
public boolean isClient()
{
return channelMultiplexer.isClient();
}
public boolean isServer()
{
return channelMultiplexer.isServer();
}
public IChannelMultiplexer getMultiplexer()
{
return channelMultiplexer;
}
public void setMultiplexer(IChannelMultiplexer channelMultiplexer)
{
this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer;
}
public short getID()
{
return id;
}
public void setID(short id)
{
checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$
this.id = id;
}
public ExecutorService getReceiveExecutor()
{
return receiveExecutor;
}
public void setReceiveExecutor(ExecutorService receiveExecutor)
{
this.receiveExecutor = receiveExecutor;
}
public IBufferHandler getReceiveHandler()
{
return receiveHandler;
}
public void setReceiveHandler(IBufferHandler receiveHandler)
{
this.receiveHandler = receiveHandler;
}
public Queue<IBuffer> getSendQueue()
{
return sendQueue;
}
public void sendBuffer(IBuffer buffer)
{
handleBuffer(buffer);
}
public void handleBuffer(IBuffer buffer)
{
BufferState state = buffer.getState();
if (state != BufferState.PUTTING)
{
OM.LOG.warn("Ignoring buffer in state == " + state + ": " + this); //$NON-NLS-1$ //$NON-NLS-2$
return;
}
if (TRACER.isEnabled())
{
TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$
}
if (sendQueue == null)
{
if (TRACER.isEnabled())
{
TRACER.trace("Ignoring buffer because sendQueue == null: " + this); //$NON-NLS-1$
}
buffer.release();
}
else
{
sendQueue.add(buffer);
channelMultiplexer.multiplexChannel(this);
}
}
public void handleBufferFromMultiplexer(IBuffer buffer)
{
if (receiveHandler != null)
{
if (TRACER.isEnabled())
{
TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$
}
receiveSerializer.addWork(craeteReceiverWork(buffer));
}
else
{
// Shutting down
buffer.release();
}
}
protected ReceiverWork craeteReceiverWork(IBuffer buffer)
{
return new ReceiverWork(this, buffer);
}
public short getBufferCapacity()
{
return channelMultiplexer.getBufferCapacity();
}
public IBuffer provideBuffer()
{
return channelMultiplexer.provideBuffer();
}
public void retainBuffer(IBuffer buffer)
{
channelMultiplexer.retainBuffer(buffer);
}
@Override
public String toString()
{
return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$
checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$
}
@Override
protected void doActivate() throws Exception
{
super.doActivate();
sendQueue = new ConcurrentLinkedQueue<IBuffer>();
if (receiveExecutor == null)
{
receiveSerializer = new SynchronousWorkSerializer();
}
else
{
// CompletionWorkSerializer throws "One command already pending"
// receiveSerializer = new CompletionWorkSerializer();
// receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor);
// receiveSerializer = new SynchronousWorkSerializer();
class ChannelReceiveSerializer extends QueueWorkerWorkSerializer
{
@Override
protected String getThreadName()
{
return "ReceiveSerializer-" + Channel.this;
}
}
receiveSerializer = new ChannelReceiveSerializer();
}
}
@Override
protected void doDeactivate() throws Exception
{
if (!inverseClosed)
{
channelMultiplexer.closeChannel(this);
}
super.doDeactivate();
}
public void finishDeactivate(boolean inverse)
{
inverseClosed = inverse;
if (inverse)
{
LifecycleUtil.deactivate(receiveHandler);
deactivate();
}
receiveHandler = null;
if (receiveSerializer != null)
{
receiveSerializer.dispose();
receiveSerializer = null;
}
if (sendQueue != null)
{
sendQueue.clear();
sendQueue = null;
}
}
public void close()
{
deactivate();
}
/**
* @author Eike Stepper
*/
protected static class ReceiverWork implements Runnable
{
private final InternalChannel channel;
private final IBuffer buffer;
public ReceiverWork(InternalChannel channel, IBuffer buffer)
{
this.channel = channel;
this.buffer = buffer;
}
public void run()
{
IBufferHandler receiveHandler = channel.getReceiveHandler();
if (receiveHandler != null)
{
receiveHandler.handleBuffer(buffer);
}
else
{
// Shutting down
buffer.release();
}
}
}
}