| // |
| // ======================================================================== |
| // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.io; |
| |
| import java.util.List; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.eclipse.jetty.util.Callback; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| import org.eclipse.jetty.util.thread.NonBlockingThread; |
| |
| /** |
| * <p>A convenience base implementation of {@link Connection}.</p> |
| * <p>This class uses the capabilities of the {@link EndPoint} API to provide a |
| * more traditional style of async reading. A call to {@link #fillInterested()} |
| * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} |
| * as appropriate.</p> |
| */ |
| public abstract class AbstractConnection implements Connection |
| { |
| private static final Logger LOG = Log.getLogger(AbstractConnection.class); |
| |
| public static final boolean EXECUTE_ONFILLABLE=true; |
| |
| private final List<Listener> listeners = new CopyOnWriteArrayList<>(); |
| private final AtomicReference<State> _state = new AtomicReference<>(IDLE); |
| private final long _created=System.currentTimeMillis(); |
| private final EndPoint _endPoint; |
| private final Executor _executor; |
| private final Callback _readCallback; |
| private final boolean _executeOnfillable; |
| private int _inputBufferSize=2048; |
| |
| protected AbstractConnection(EndPoint endp, Executor executor) |
| { |
| this(endp,executor,EXECUTE_ONFILLABLE); |
| } |
| |
| protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable) |
| { |
| if (executor == null) |
| throw new IllegalArgumentException("Executor must not be null!"); |
| _endPoint = endp; |
| _executor = executor; |
| _readCallback = new ReadCallback(); |
| _executeOnfillable=executeOnfillable; |
| _state.set(IDLE); |
| } |
| |
| @Override |
| public void addListener(Listener listener) |
| { |
| listeners.add(listener); |
| } |
| |
| public int getInputBufferSize() |
| { |
| return _inputBufferSize; |
| } |
| |
| public void setInputBufferSize(int inputBufferSize) |
| { |
| _inputBufferSize = inputBufferSize; |
| } |
| |
| protected Executor getExecutor() |
| { |
| return _executor; |
| } |
| |
| protected void failedCallback(final Callback callback, final Throwable x) |
| { |
| if (NonBlockingThread.isNonBlockingThread()) |
| { |
| try |
| { |
| getExecutor().execute(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| callback.failed(x); |
| } |
| }); |
| } |
| catch(RejectedExecutionException e) |
| { |
| LOG.debug(e); |
| callback.failed(x); |
| } |
| } |
| else |
| { |
| callback.failed(x); |
| } |
| } |
| |
| /** |
| * <p>Utility method to be called to register read interest.</p> |
| * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} |
| * will be called back as appropriate.</p> |
| * @see #onFillable() |
| */ |
| public void fillInterested() |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("fillInterested {}",this); |
| |
| while(true) |
| { |
| State state=_state.get(); |
| if (next(state,state.fillInterested())) |
| break; |
| } |
| } |
| |
| public void fillInterested(Callback callback) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("fillInterested {}",this); |
| |
| while(true) |
| { |
| State state=_state.get(); |
| // TODO yuck |
| if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback) |
| break; |
| State next=new FillingInterestedCallback(callback,state); |
| if (next(state,next)) |
| break; |
| } |
| } |
| |
| /** |
| * <p>Callback method invoked when the endpoint is ready to be read.</p> |
| * @see #fillInterested() |
| */ |
| public abstract void onFillable(); |
| |
| /** |
| * <p>Callback method invoked when the endpoint failed to be ready to be read.</p> |
| * @param cause the exception that caused the failure |
| */ |
| protected void onFillInterestedFailed(Throwable cause) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} onFillInterestedFailed {}", this, cause); |
| if (_endPoint.isOpen()) |
| { |
| boolean close = true; |
| if (cause instanceof TimeoutException) |
| close = onReadTimeout(); |
| if (close) |
| { |
| if (_endPoint.isOutputShutdown()) |
| _endPoint.close(); |
| else |
| _endPoint.shutdownOutput(); |
| } |
| } |
| |
| if (_endPoint.isOpen()) |
| fillInterested(); |
| } |
| |
| /** |
| * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p> |
| * @return true to signal that the endpoint must be closed, false to keep the endpoint open |
| */ |
| protected boolean onReadTimeout() |
| { |
| return true; |
| } |
| |
| @Override |
| public void onOpen() |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("onOpen {}", this); |
| |
| for (Listener listener : listeners) |
| listener.onOpened(this); |
| } |
| |
| @Override |
| public void onClose() |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("onClose {}",this); |
| |
| for (Listener listener : listeners) |
| listener.onClosed(this); |
| } |
| |
| @Override |
| public EndPoint getEndPoint() |
| { |
| return _endPoint; |
| } |
| |
| @Override |
| public void close() |
| { |
| getEndPoint().close(); |
| } |
| |
| @Override |
| public int getMessagesIn() |
| { |
| return -1; |
| } |
| |
| @Override |
| public int getMessagesOut() |
| { |
| return -1; |
| } |
| |
| @Override |
| public long getBytesIn() |
| { |
| return -1; |
| } |
| |
| @Override |
| public long getBytesOut() |
| { |
| return -1; |
| } |
| |
| @Override |
| public long getCreatedTimeStamp() |
| { |
| return _created; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("%s@%x[%s,%s]", |
| getClass().getSimpleName(), |
| hashCode(), |
| _state.get(), |
| _endPoint); |
| } |
| |
| public boolean next(State state, State next) |
| { |
| if (next==null) |
| return true; |
| if(_state.compareAndSet(state,next)) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{}-->{} {}",state,next,this); |
| if (next!=state) |
| next.onEnter(AbstractConnection.this); |
| return true; |
| } |
| return false; |
| } |
| |
| private static final class IdleState extends State |
| { |
| private IdleState() |
| { |
| super("IDLE"); |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return FILL_INTERESTED; |
| } |
| } |
| |
| |
| private static final class FillInterestedState extends State |
| { |
| private FillInterestedState() |
| { |
| super("FILL_INTERESTED"); |
| } |
| |
| @Override |
| public void onEnter(AbstractConnection connection) |
| { |
| connection.getEndPoint().fillInterested(connection._readCallback); |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return this; |
| } |
| |
| @Override |
| public State onFillable() |
| { |
| return FILLING; |
| } |
| |
| @Override |
| State onFailed() |
| { |
| return IDLE; |
| } |
| } |
| |
| |
| private static final class RefillingState extends State |
| { |
| private RefillingState() |
| { |
| super("REFILLING"); |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return FILLING_FILL_INTERESTED; |
| } |
| |
| @Override |
| public State onFilled() |
| { |
| return IDLE; |
| } |
| } |
| |
| |
| private static final class FillingFillInterestedState extends State |
| { |
| private FillingFillInterestedState(String name) |
| { |
| super(name); |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return this; |
| } |
| |
| State onFilled() |
| { |
| return FILL_INTERESTED; |
| } |
| } |
| |
| |
| private static final class FillingState extends State |
| { |
| private FillingState() |
| { |
| super("FILLING"); |
| } |
| |
| @Override |
| public void onEnter(AbstractConnection connection) |
| { |
| if (connection._executeOnfillable) |
| connection.getExecutor().execute(connection._runOnFillable); |
| else |
| connection._runOnFillable.run(); |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return FILLING_FILL_INTERESTED; |
| } |
| |
| @Override |
| public State onFilled() |
| { |
| return IDLE; |
| } |
| } |
| |
| |
| public static class State |
| { |
| private final String _name; |
| State(String name) |
| { |
| _name=name; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return _name; |
| } |
| |
| void onEnter(AbstractConnection connection) |
| { |
| } |
| |
| State fillInterested() |
| { |
| throw new IllegalStateException(this.toString()); |
| } |
| |
| State onFillable() |
| { |
| throw new IllegalStateException(this.toString()); |
| } |
| |
| State onFilled() |
| { |
| throw new IllegalStateException(this.toString()); |
| } |
| |
| State onFailed() |
| { |
| throw new IllegalStateException(this.toString()); |
| } |
| } |
| |
| |
| public static final State IDLE=new IdleState(); |
| |
| public static final State FILL_INTERESTED=new FillInterestedState(); |
| |
| public static final State FILLING=new FillingState(); |
| |
| public static final State REFILLING=new RefillingState(); |
| |
| public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED"); |
| |
| public class NestedState extends State |
| { |
| private final State _nested; |
| |
| NestedState(State nested) |
| { |
| super("NESTED("+nested+")"); |
| _nested=nested; |
| } |
| NestedState(String name,State nested) |
| { |
| super(name+"("+nested+")"); |
| _nested=nested; |
| } |
| |
| @Override |
| State fillInterested() |
| { |
| return new NestedState(_nested.fillInterested()); |
| } |
| |
| @Override |
| State onFillable() |
| { |
| return new NestedState(_nested.onFillable()); |
| } |
| |
| @Override |
| State onFilled() |
| { |
| return new NestedState(_nested.onFilled()); |
| } |
| } |
| |
| |
| public class FillingInterestedCallback extends NestedState |
| { |
| private final Callback _callback; |
| |
| FillingInterestedCallback(Callback callback,State nested) |
| { |
| super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested); |
| _callback=callback; |
| } |
| |
| @Override |
| void onEnter(final AbstractConnection connection) |
| { |
| Callback callback=new Callback() |
| { |
| @Override |
| public void succeeded() |
| { |
| while(true) |
| { |
| State state = connection._state.get(); |
| if (!(state instanceof NestedState)) |
| break; |
| State nested=((NestedState)state)._nested; |
| if (connection.next(state,nested)) |
| break; |
| } |
| _callback.succeeded(); |
| } |
| |
| @Override |
| public void failed(Throwable x) |
| { |
| while(true) |
| { |
| State state = connection._state.get(); |
| if (!(state instanceof NestedState)) |
| break; |
| State nested=((NestedState)state)._nested; |
| if (connection.next(state,nested)) |
| break; |
| } |
| _callback.failed(x); |
| } |
| }; |
| |
| connection.getEndPoint().fillInterested(callback); |
| } |
| } |
| |
| private final Runnable _runOnFillable = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| onFillable(); |
| } |
| finally |
| { |
| while(true) |
| { |
| State state=_state.get(); |
| if (next(state,state.onFilled())) |
| break; |
| } |
| } |
| } |
| }; |
| |
| |
| private class ReadCallback implements Callback |
| { |
| @Override |
| public void succeeded() |
| { |
| while(true) |
| { |
| State state=_state.get(); |
| if (next(state,state.onFillable())) |
| break; |
| } |
| } |
| |
| @Override |
| public void failed(final Throwable x) |
| { |
| _executor.execute(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| while(true) |
| { |
| State state=_state.get(); |
| if (next(state,state.onFailed())) |
| break; |
| } |
| onFillInterestedFailed(x); |
| } |
| }); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this); |
| } |
| }; |
| } |