| // |
| // ======================================================================== |
| // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.fcgi.client.http; |
| |
| import java.io.EOFException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.AsynchronousCloseException; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.eclipse.jetty.client.HttpClient; |
| import org.eclipse.jetty.client.HttpConnection; |
| import org.eclipse.jetty.client.HttpDestination; |
| import org.eclipse.jetty.client.HttpExchange; |
| import org.eclipse.jetty.client.api.Connection; |
| import org.eclipse.jetty.client.api.Request; |
| import org.eclipse.jetty.client.api.Response; |
| import org.eclipse.jetty.fcgi.FCGI; |
| import org.eclipse.jetty.fcgi.generator.Flusher; |
| import org.eclipse.jetty.fcgi.parser.ClientParser; |
| import org.eclipse.jetty.http.HttpField; |
| import org.eclipse.jetty.http.HttpFields; |
| import org.eclipse.jetty.http.HttpHeader; |
| import org.eclipse.jetty.http.HttpHeaderValue; |
| import org.eclipse.jetty.io.AbstractConnection; |
| import org.eclipse.jetty.io.ByteBufferPool; |
| import org.eclipse.jetty.io.EndPoint; |
| import org.eclipse.jetty.util.BufferUtil; |
| import org.eclipse.jetty.util.CompletableCallback; |
| import org.eclipse.jetty.util.Promise; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| |
| public class HttpConnectionOverFCGI extends AbstractConnection implements Connection |
| { |
| private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class); |
| |
| private final LinkedList<Integer> requests = new LinkedList<>(); |
| private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>(); |
| private final AtomicBoolean closed = new AtomicBoolean(); |
| private final Flusher flusher; |
| private final HttpDestination destination; |
| private final Promise<Connection> promise; |
| private final boolean multiplexed; |
| private final Delegate delegate; |
| private final ClientParser parser; |
| private ByteBuffer buffer; |
| |
| /** |
| * @deprecated use {@link #HttpConnectionOverFCGI(EndPoint, HttpDestination, Promise, boolean)} instead |
| */ |
| @Deprecated |
| public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed) |
| { |
| this(endPoint, destination, new Promise.Adapter<Connection>(), multiplexed); |
| throw new UnsupportedOperationException("Deprecated, use HttpConnectionOverFCGI(EndPoint, HttpDestination, Promise<Connection>, boolean) instead"); |
| } |
| |
| public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed) |
| { |
| super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO()); |
| this.destination = destination; |
| this.promise = promise; |
| this.multiplexed = multiplexed; |
| this.flusher = new Flusher(endPoint); |
| this.delegate = new Delegate(destination); |
| this.parser = new ClientParser(new ResponseListener()); |
| requests.addLast(0); |
| } |
| |
| public HttpDestination getHttpDestination() |
| { |
| return destination; |
| } |
| |
| @Override |
| public void send(Request request, Response.CompleteListener listener) |
| { |
| delegate.send(request, listener); |
| } |
| |
| protected void send(HttpExchange exchange) |
| { |
| delegate.send(exchange); |
| } |
| |
| @Override |
| public void onOpen() |
| { |
| super.onOpen(); |
| fillInterested(); |
| promise.succeeded(this); |
| } |
| |
| @Override |
| public void onFillable() |
| { |
| HttpClient client = destination.getHttpClient(); |
| ByteBufferPool bufferPool = client.getByteBufferPool(); |
| buffer = bufferPool.acquire(client.getResponseBufferSize(), true); |
| process(); |
| } |
| |
| private void process() |
| { |
| if (readAndParse()) |
| { |
| HttpClient client = destination.getHttpClient(); |
| ByteBufferPool bufferPool = client.getByteBufferPool(); |
| bufferPool.release(buffer); |
| // Don't linger the buffer around if we are idle. |
| buffer = null; |
| } |
| } |
| |
| private boolean readAndParse() |
| { |
| EndPoint endPoint = getEndPoint(); |
| ByteBuffer buffer = this.buffer; |
| while (true) |
| { |
| try |
| { |
| if (parse(buffer)) |
| return false; |
| |
| int read = endPoint.fill(buffer); |
| if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'. |
| LOG.debug("Read {} bytes from {}", read, endPoint); |
| if (read > 0) |
| { |
| if (parse(buffer)) |
| return false; |
| } |
| else if (read == 0) |
| { |
| fillInterested(); |
| return true; |
| } |
| else |
| { |
| shutdown(); |
| return true; |
| } |
| } |
| catch (Exception x) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug(x); |
| close(x); |
| return false; |
| } |
| } |
| } |
| |
| private boolean parse(ByteBuffer buffer) |
| { |
| return parser.parse(buffer); |
| } |
| |
| private void shutdown() |
| { |
| // Close explicitly only if we are idle, since the request may still |
| // be in progress, otherwise close only if we can fail the responses. |
| if (channels.isEmpty()) |
| close(); |
| else |
| failAndClose(new EOFException()); |
| } |
| |
| @Override |
| protected boolean onReadTimeout() |
| { |
| close(new TimeoutException()); |
| return false; |
| } |
| |
| protected void release(HttpChannelOverFCGI channel) |
| { |
| channels.remove(channel.getRequest()); |
| destination.release(this); |
| } |
| |
| @Override |
| public void close() |
| { |
| close(new AsynchronousCloseException()); |
| } |
| |
| protected void close(Throwable failure) |
| { |
| if (closed.compareAndSet(false, true)) |
| { |
| // First close then abort, to be sure that the connection cannot be reused |
| // from an onFailure() handler or by blocking code waiting for completion. |
| getHttpDestination().close(this); |
| getEndPoint().shutdownOutput(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} oshut", this); |
| getEndPoint().close(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} closed", this); |
| |
| abort(failure); |
| } |
| } |
| |
| protected boolean closeByHTTP(HttpFields fields) |
| { |
| if (multiplexed) |
| return false; |
| if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())) |
| return false; |
| close(); |
| return true; |
| } |
| |
| protected void abort(Throwable failure) |
| { |
| for (HttpChannelOverFCGI channel : channels.values()) |
| { |
| HttpExchange exchange = channel.getHttpExchange(); |
| if (exchange != null) |
| exchange.getRequest().abort(failure); |
| } |
| channels.clear(); |
| } |
| |
| private void failAndClose(Throwable failure) |
| { |
| boolean result = false; |
| for (HttpChannelOverFCGI channel : channels.values()) |
| result |= channel.responseFailure(failure); |
| if (result) |
| close(failure); |
| } |
| |
| private int acquireRequest() |
| { |
| synchronized (requests) |
| { |
| int last = requests.getLast(); |
| int request = last + 1; |
| requests.addLast(request); |
| return request; |
| } |
| } |
| |
| private void releaseRequest(int request) |
| { |
| synchronized (requests) |
| { |
| requests.removeFirstOccurrence(request); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("%s@%h(l:%s <-> r:%s)", |
| getClass().getSimpleName(), |
| this, |
| getEndPoint().getLocalAddress(), |
| getEndPoint().getRemoteAddress()); |
| } |
| |
| private class Delegate extends HttpConnection |
| { |
| private Delegate(HttpDestination destination) |
| { |
| super(destination); |
| } |
| |
| @Override |
| protected void send(HttpExchange exchange) |
| { |
| Request request = exchange.getRequest(); |
| normalizeRequest(request); |
| |
| // FCGI may be multiplexed, so create one channel for each request. |
| int id = acquireRequest(); |
| HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout()); |
| channels.put(id, channel); |
| if (channel.associate(exchange)) |
| channel.send(); |
| else |
| channel.release(); |
| } |
| |
| @Override |
| public void close() |
| { |
| HttpConnectionOverFCGI.this.close(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return HttpConnectionOverFCGI.this.toString(); |
| } |
| } |
| |
| private class ResponseListener implements ClientParser.Listener |
| { |
| @Override |
| public void onBegin(int request, int code, String reason) |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| channel.responseBegin(code, reason); |
| else |
| noChannel(request); |
| } |
| |
| @Override |
| public void onHeader(int request, HttpField field) |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| channel.responseHeader(field); |
| else |
| noChannel(request); |
| } |
| |
| @Override |
| public void onHeaders(int request) |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| channel.responseHeaders(); |
| else |
| noChannel(request); |
| } |
| |
| @Override |
| public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) |
| { |
| switch (stream) |
| { |
| case STD_OUT: |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| { |
| CompletableCallback callback = new CompletableCallback() |
| { |
| @Override |
| public void resume() |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("Content consumed asynchronously, resuming processing"); |
| process(); |
| } |
| |
| @Override |
| public void abort(Throwable x) |
| { |
| close(x); |
| } |
| }; |
| // Do not short circuit these calls. |
| boolean proceed = channel.content(buffer, callback); |
| boolean async = callback.tryComplete(); |
| return !proceed || async; |
| } |
| else |
| { |
| noChannel(request); |
| } |
| break; |
| } |
| case STD_ERR: |
| { |
| LOG.info(BufferUtil.toUTF8String(buffer)); |
| break; |
| } |
| default: |
| { |
| throw new IllegalArgumentException(); |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void onEnd(int request) |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| { |
| if (channel.responseSuccess()) |
| releaseRequest(request); |
| } |
| else |
| { |
| noChannel(request); |
| } |
| } |
| |
| @Override |
| public void onFailure(int request, Throwable failure) |
| { |
| HttpChannelOverFCGI channel = channels.get(request); |
| if (channel != null) |
| { |
| if (channel.responseFailure(failure)) |
| releaseRequest(request); |
| } |
| else |
| { |
| noChannel(request); |
| } |
| } |
| |
| private void noChannel(int request) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("Channel not found for request {}", request); |
| } |
| } |
| } |