| /* |
| * Copyright (c) 2006-2015 Eike Stepper (Berlin, Germany) and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Eike Stepper - initial API and implementation |
| * Andre Dietisheim - maintenance |
| */ |
| package org.eclipse.net4j.signal; |
| |
| import org.eclipse.net4j.buffer.BufferInputStream; |
| import org.eclipse.net4j.buffer.IBuffer; |
| import org.eclipse.net4j.buffer.IBufferProvider; |
| import org.eclipse.net4j.channel.ChannelOutputStream; |
| import org.eclipse.net4j.channel.IChannel; |
| import org.eclipse.net4j.connector.IConnector; |
| import org.eclipse.net4j.util.WrappedException; |
| import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; |
| import org.eclipse.net4j.util.event.Event; |
| import org.eclipse.net4j.util.event.IEvent; |
| import org.eclipse.net4j.util.event.IListener; |
| import org.eclipse.net4j.util.io.IORuntimeException; |
| import org.eclipse.net4j.util.io.IStreamWrapper; |
| import org.eclipse.net4j.util.io.StreamWrapperChain; |
| import org.eclipse.net4j.util.lifecycle.LifecycleUtil; |
| import org.eclipse.net4j.util.om.log.OMLogger; |
| import org.eclipse.net4j.util.om.trace.ContextTracer; |
| |
| import org.eclipse.internal.net4j.bundle.OM; |
| |
| import org.eclipse.spi.net4j.InternalChannel; |
| import org.eclipse.spi.net4j.Protocol; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.text.MessageFormat; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| |
| /** |
| * The default implementation of a {@link ISignalProtocol signal protocol}. |
| * <p> |
| * On the {@link org.eclipse.net4j.ILocationAware.Location#SERVER receiver side(s)} of protocol the |
| * {@link #createSignalReactor(short) createSignalReactor()} method has to be overridden to |
| * create appropriate peer instances for incoming {@link Signal signals}. |
| * |
| * @author Eike Stepper |
| */ |
| public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> |
| implements ISignalProtocol<INFRA_STRUCTURE> |
| { |
| /** |
| * @since 2.0 |
| */ |
| public static final short SIGNAL_REMOTE_EXCEPTION = -1; |
| |
| /** |
| * @since 2.0 |
| */ |
| public static final short SIGNAL_MONITOR_CANCELED = -2; |
| |
| /** |
| * @since 2.0 |
| */ |
| public static final short SIGNAL_MONITOR_PROGRESS = -3; |
| |
| /** |
| * @since 4.1 |
| */ |
| public static final short SIGNAL_SET_TIMEOUT = -4; |
| |
| private static final int MIN_CORRELATION_ID = 1; |
| |
| private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE; |
| |
| private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, SignalProtocol.class); |
| |
| private static final ContextTracer STREAM_TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, SignalProtocol.class); |
| |
| private long timeout = DEFAULT_TIMEOUT; |
| |
| private IStreamWrapper streamWrapper; |
| |
| private Map<Integer, Signal> signals = new HashMap<Integer, Signal>(); |
| |
| private int nextCorrelationID = MIN_CORRELATION_ID; |
| |
| private boolean failingOver; |
| |
| /** |
| * @since 2.0 |
| */ |
| public SignalProtocol(String type) |
| { |
| super(type); |
| } |
| |
| /** |
| * @since 2.0 |
| */ |
| public long getTimeout() |
| { |
| return timeout; |
| } |
| |
| /** |
| * Equivalent to calling SignalProtocol.setTimeout(timeout, false). |
| * |
| * @since 2.0 |
| */ |
| public void setTimeout(long timeout) |
| { |
| setTimeout(timeout, false); |
| } |
| |
| /** |
| * Update the timeout used for signal end of stream waiting time. |
| * |
| * @param timeout the new timeout |
| * @param useOldTimeoutToSendNewOne true to use the old timeout, false to use the new specified one to sent to server side the new specified timeout |
| * @return true if the new specified timeout has correctly been sent |
| * |
| * NOTE: this second parameter is useful mostly for test to be able to set a to small timeout |
| * @since 4.4 |
| */ |
| public boolean setTimeout(long timeout, boolean useOldTimeoutToSendNewOne) |
| { |
| boolean timeoutSent = false; |
| long oldTimeout = this.timeout; |
| if (!useOldTimeoutToSendNewOne) |
| { |
| handleSetTimeOut(timeout); |
| } |
| |
| if (oldTimeout != timeout && isActive()) |
| { |
| timeoutSent = sendSetTimeout(); |
| } |
| |
| if (timeoutSent && useOldTimeoutToSendNewOne) |
| { |
| handleSetTimeOut(timeout); |
| } |
| |
| return timeoutSent; |
| } |
| |
| public IStreamWrapper getStreamWrapper() |
| { |
| return streamWrapper; |
| } |
| |
| public void setStreamWrapper(IStreamWrapper streamWrapper) |
| { |
| this.streamWrapper = streamWrapper; |
| } |
| |
| public void addStreamWrapper(IStreamWrapper streamWrapper) |
| { |
| if (this.streamWrapper == null) |
| { |
| this.streamWrapper = streamWrapper; |
| } |
| else |
| { |
| this.streamWrapper = new StreamWrapperChain(streamWrapper, this.streamWrapper); |
| } |
| } |
| |
| /** |
| * @since 2.0 |
| */ |
| public IChannel open(IConnector connector) |
| { |
| return connector.openChannel(this); |
| } |
| |
| /** |
| * @since 2.0 |
| */ |
| public void close() |
| { |
| LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); |
| } |
| |
| public boolean waitForSignals(long timeout) |
| { |
| synchronized (signals) |
| { |
| while (!signals.isEmpty()) |
| { |
| try |
| { |
| signals.wait(timeout); |
| } |
| catch (InterruptedException ex) |
| { |
| return false; |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Handles a given (incoming) buffer. Creates a signal to act upon the given buffer or uses a previously created |
| * signal. |
| */ |
| public void handleBuffer(IBuffer buffer) |
| { |
| ByteBuffer byteBuffer = buffer.getByteBuffer(); |
| int correlationID = byteBuffer.getInt(); |
| if (TRACER.isEnabled()) |
| { |
| TRACER.trace("Received buffer for correlation " + correlationID); //$NON-NLS-1$ |
| } |
| |
| Signal signal; |
| boolean newSignalScheduled = false; |
| |
| synchronized (signals) |
| { |
| if (correlationID > 0) |
| { |
| // Incoming indication |
| signal = signals.get(-correlationID); |
| if (signal == null) |
| { |
| short signalID = byteBuffer.getShort(); |
| if (TRACER.isEnabled()) |
| { |
| TRACER.trace("Got signalID: " + signalID); //$NON-NLS-1$ |
| } |
| |
| signal = provideSignalReactor(signalID); |
| if (signal != null) |
| { |
| signal.setCorrelationID(-correlationID); |
| signal.setBufferInputStream(new SignalInputStream(getTimeout())); |
| if (signal instanceof IndicationWithResponse) |
| { |
| signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false)); |
| } |
| |
| signals.put(-correlationID, signal); |
| getExecutorService().execute(signal); |
| newSignalScheduled = true; |
| } |
| } |
| } |
| else |
| { |
| // Incoming confirmation |
| signal = signals.get(-correlationID); |
| } |
| } |
| |
| if (signal != null) // Can be null after timeout |
| { |
| if (newSignalScheduled) |
| { |
| fireSignalScheduledEvent(signal); |
| } |
| |
| BufferInputStream inputStream = signal.getBufferInputStream(); |
| inputStream.handleBuffer(buffer); |
| } |
| else |
| { |
| if (TRACER.isEnabled()) |
| { |
| TRACER.trace("Discarding buffer"); //$NON-NLS-1$ |
| } |
| |
| buffer.release(); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| IChannel channel = getChannel(); |
| if (channel != null) |
| { |
| return MessageFormat.format("SignalProtocol[{0}, {1}, {2}]", channel.getID(), channel.getLocation(), getType()); //$NON-NLS-1$ |
| } |
| |
| return MessageFormat.format("SignalProtocol[{0}]", getType()); //$NON-NLS-1$ |
| } |
| |
| @Override |
| protected void doBeforeDeactivate() throws Exception |
| { |
| synchronized (signals) |
| { |
| // Wait at most 10 seconds for running signals to finish |
| int waitMillis = 10 * 1000; |
| long stop = System.currentTimeMillis() + waitMillis; |
| while (!signals.isEmpty() && System.currentTimeMillis() < stop) |
| { |
| signals.wait(1000L); |
| } |
| } |
| } |
| |
| @Override |
| protected void doDeactivate() throws Exception |
| { |
| try |
| { |
| synchronized (signals) |
| { |
| signals.clear(); |
| } |
| |
| IChannel channel = getChannel(); |
| if (channel != null) |
| { |
| channel.close(); |
| setChannel(null); |
| } |
| } |
| finally |
| { |
| super.doDeactivate(); |
| } |
| } |
| |
| @Override |
| protected void handleChannelDeactivation() |
| { |
| if (!failingOver) |
| { |
| super.handleChannelDeactivation(); |
| } |
| } |
| |
| protected final SignalReactor provideSignalReactor(short signalID) |
| { |
| if (!isActive()) |
| { |
| return null; |
| } |
| |
| switch (signalID) |
| { |
| case SIGNAL_REMOTE_EXCEPTION: |
| return new RemoteExceptionIndication(this); |
| |
| case SIGNAL_MONITOR_CANCELED: |
| return new MonitorCanceledIndication(this); |
| |
| case SIGNAL_MONITOR_PROGRESS: |
| return new MonitorProgressIndication(this); |
| |
| case SIGNAL_SET_TIMEOUT: |
| return new SetTimeoutIndication(this); |
| |
| default: |
| SignalReactor signal = createSignalReactor(signalID); |
| if (signal == null) |
| { |
| throw new IllegalArgumentException("Invalid signalID " + signalID); //$NON-NLS-1$ |
| } |
| |
| return signal; |
| } |
| } |
| |
| /** |
| * Returns a new signal instance to serve the given signal ID or <code>null</code> if the signal ID is invalid/unknown |
| * for this protocol. |
| */ |
| protected SignalReactor createSignalReactor(short signalID) |
| { |
| return null; |
| } |
| |
| /** |
| * Returns <code>true</code> by default, override to change this behaviour. |
| * |
| * @since 4.1 |
| */ |
| protected boolean isSendingTimeoutChanges() |
| { |
| return true; |
| } |
| |
| synchronized int getNextCorrelationID() |
| { |
| int correlationID = nextCorrelationID; |
| if (nextCorrelationID == MAX_CORRELATION_ID) |
| { |
| if (TRACER.isEnabled()) |
| { |
| TRACER.trace("Correlation ID wrap-around"); //$NON-NLS-1$ |
| } |
| |
| nextCorrelationID = MIN_CORRELATION_ID; |
| } |
| else |
| { |
| ++nextCorrelationID; |
| } |
| |
| return correlationID; |
| } |
| |
| InputStream wrapInputStream(InputStream in) throws IOException |
| { |
| if (streamWrapper != null) |
| { |
| in = streamWrapper.wrapInputStream(in); |
| } |
| |
| return in; |
| } |
| |
| OutputStream wrapOutputStream(OutputStream out) throws IOException |
| { |
| if (streamWrapper != null) |
| { |
| out = streamWrapper.wrapOutputStream(out); |
| } |
| |
| return out; |
| } |
| |
| void finishInputStream(InputStream in) throws IOException |
| { |
| if (streamWrapper != null) |
| { |
| streamWrapper.finishInputStream(in); |
| } |
| } |
| |
| void finishOutputStream(OutputStream out) throws IOException |
| { |
| if (streamWrapper != null) |
| { |
| streamWrapper.finishOutputStream(out); |
| } |
| } |
| |
| void startSignal(SignalActor signalActor, long timeout) throws Exception |
| { |
| checkArg(signalActor.getProtocol() == this, "Wrong protocol"); //$NON-NLS-1$ |
| short signalID = signalActor.getID(); |
| int correlationID = signalActor.getCorrelationID(); |
| signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true)); |
| if (signalActor instanceof RequestWithConfirmation<?>) |
| { |
| signalActor.setBufferInputStream(new SignalInputStream(timeout)); |
| } |
| |
| synchronized (signals) |
| { |
| signals.put(correlationID, signalActor); |
| } |
| |
| fireSignalScheduledEvent(signalActor); |
| signalActor.runSync(); |
| } |
| |
| void stopSignal(Signal signal, Exception exception) |
| { |
| int correlationID = signal.getCorrelationID(); |
| synchronized (signals) |
| { |
| signals.remove(correlationID); |
| signals.notifyAll(); |
| } |
| |
| fireSignalFinishedEvent(signal, exception); |
| } |
| |
| void handleRemoteException(int correlationID, Throwable t, boolean responding) |
| { |
| synchronized (signals) |
| { |
| Signal signal = signals.remove(correlationID); |
| if (signal instanceof RequestWithConfirmation<?>) |
| { |
| RequestWithConfirmation<?> request = (RequestWithConfirmation<?>)signal; |
| request.setRemoteException(t, responding); |
| } |
| |
| signals.notifyAll(); |
| } |
| } |
| |
| void handleMonitorProgress(int correlationID, double totalWork, double work) |
| { |
| synchronized (signals) |
| { |
| Signal signal = signals.get(correlationID); |
| if (signal instanceof RequestWithMonitoring<?>) |
| { |
| RequestWithMonitoring<?> request = (RequestWithMonitoring<?>)signal; |
| request.setMonitorProgress(totalWork, work); |
| } |
| } |
| } |
| |
| void handleMonitorCanceled(int correlationID) |
| { |
| synchronized (signals) |
| { |
| Signal signal = signals.get(-correlationID); |
| if (signal instanceof IndicationWithMonitoring) |
| { |
| IndicationWithMonitoring indication = (IndicationWithMonitoring)signal; |
| indication.setMonitorCanceled(); |
| } |
| } |
| } |
| |
| void handleSetTimeOut(long timeout) |
| { |
| long oldTimeout = this.timeout; |
| if (oldTimeout != timeout) |
| { |
| this.timeout = timeout; |
| fireEvent(new TimeoutChangedEvent(this, oldTimeout, timeout)); |
| } |
| } |
| |
| boolean sendSetTimeout() |
| { |
| boolean timeoutSent = false; |
| if (isSendingTimeoutChanges()) |
| { |
| try |
| { |
| timeoutSent = new SetTimeoutRequest(this, this.timeout).send(); |
| } |
| catch (Exception ex) |
| { |
| throw WrappedException.wrap(ex); |
| } |
| } |
| return timeoutSent; |
| } |
| |
| private void fireSignalScheduledEvent(Signal signal) |
| { |
| IListener[] listeners = getListeners(); |
| if (listeners != null) |
| { |
| IEvent event = new SignalScheduledEvent<INFRA_STRUCTURE>(this, signal); |
| fireEvent(event, listeners); |
| } |
| } |
| |
| private void fireSignalFinishedEvent(Signal signal, Exception exception) |
| { |
| IListener[] listeners = getListeners(); |
| if (listeners != null) |
| { |
| IEvent event = new SignalFinishedEvent<INFRA_STRUCTURE>(this, signal, exception); |
| fireEvent(event, listeners); |
| } |
| } |
| |
| /** |
| * An {@link IEvent event} fired from a {@link ISignalProtocol signal protocol} when the protocol {@link ISignalProtocol#setTimeout(long) timeout} |
| * has been changed. |
| * |
| * @author Eike Stepper |
| * @since 4.1 |
| */ |
| public static final class TimeoutChangedEvent extends Event |
| { |
| private static final long serialVersionUID = 1L; |
| |
| private long oldTimeout; |
| |
| private long newTimeout; |
| |
| private TimeoutChangedEvent(ISignalProtocol<?> source, long oldTimeout, long newTimeout) |
| { |
| super(source); |
| this.oldTimeout = oldTimeout; |
| this.newTimeout = newTimeout; |
| } |
| |
| @Override |
| public SignalProtocol<?> getSource() |
| { |
| return (SignalProtocol<?>)super.getSource(); |
| } |
| |
| public long getOldTimeout() |
| { |
| return oldTimeout; |
| } |
| |
| public long getNewTimeout() |
| { |
| return newTimeout; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "TimeoutChangedEvent [oldTimeout=" + oldTimeout + ", newTimeout=" + newTimeout + ", source=" + source |
| + "]"; |
| } |
| |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| class SignalInputStream extends BufferInputStream |
| { |
| private long timeout; |
| |
| public SignalInputStream(long timeout) |
| { |
| this.timeout = timeout; |
| } |
| |
| @Override |
| public long getMillisBeforeTimeout() |
| { |
| return timeout; |
| } |
| |
| @Override |
| public int read() throws IOException |
| { |
| if (isCCAM()) |
| { |
| final InternalChannel channel = (InternalChannel)getChannel(); |
| |
| ExecutorService executorService = channel.getReceiveExecutor(); |
| executorService.submit(new Runnable() |
| { |
| public void run() |
| { |
| ConcurrencyUtil.sleep(500); |
| channel.close(); |
| } |
| }); |
| } |
| |
| return super.read(); |
| } |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| class SignalOutputStream extends ChannelOutputStream |
| { |
| public SignalOutputStream(final int correlationID, final short signalID, final boolean addSignalID) |
| { |
| super(getChannel(), new IBufferProvider() |
| { |
| private IBufferProvider delegate = getBufferProvider(); |
| |
| private boolean firstBuffer = addSignalID; |
| |
| public short getBufferCapacity() |
| { |
| return delegate.getBufferCapacity(); |
| } |
| |
| public IBuffer provideBuffer() |
| { |
| IChannel channel = getChannel(); |
| if (channel == null) |
| { |
| throw new IORuntimeException("No channel for protocol " + SignalProtocol.this); //$NON-NLS-1$ |
| } |
| |
| IBuffer buffer = delegate.provideBuffer(); |
| ByteBuffer byteBuffer = buffer.startPutting(channel.getID()); |
| if (STREAM_TRACER.isEnabled()) |
| { |
| STREAM_TRACER.trace("Providing buffer for correlation " + correlationID); //$NON-NLS-1$ |
| } |
| |
| byteBuffer.putInt(correlationID); |
| if (firstBuffer) |
| { |
| if (SignalProtocol.TRACER.isEnabled()) |
| { |
| STREAM_TRACER.trace("Put signal id " + signalID); //$NON-NLS-1$ |
| } |
| |
| byteBuffer.putShort(signalID); |
| } |
| |
| firstBuffer = false; |
| return buffer; |
| } |
| |
| public void retainBuffer(IBuffer buffer) |
| { |
| delegate.retainBuffer(buffer); |
| } |
| }); |
| } |
| } |
| } |