| // |
| // ======================================================================== |
| // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.proxy; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.WritePendingException; |
| |
| import javax.servlet.ReadListener; |
| import javax.servlet.ServletConfig; |
| import javax.servlet.ServletException; |
| import javax.servlet.ServletInputStream; |
| import javax.servlet.ServletOutputStream; |
| import javax.servlet.WriteListener; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| |
| import org.eclipse.jetty.client.api.ContentProvider; |
| import org.eclipse.jetty.client.api.Request; |
| import org.eclipse.jetty.client.api.Response; |
| import org.eclipse.jetty.client.util.DeferredContentProvider; |
| import org.eclipse.jetty.util.Callback; |
| import org.eclipse.jetty.util.IteratingCallback; |
| |
| /** |
| * <p>Servlet 3.1 asynchronous proxy servlet.</p> |
| * <p>Both the request processing and the I/O are asynchronous.</p> |
| * |
| * @see ProxyServlet |
| * @see AsyncMiddleManServlet |
| * @see ConnectHandler |
| */ |
| public class AsyncProxyServlet extends ProxyServlet |
| { |
| private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener"; |
| |
| @Override |
| protected ContentProvider proxyRequestContent(HttpServletRequest request, HttpServletResponse response, Request proxyRequest) throws IOException |
| { |
| ServletInputStream input = request.getInputStream(); |
| DeferredContentProvider provider = new DeferredContentProvider(); |
| input.setReadListener(newReadListener(request, response, proxyRequest, provider)); |
| return provider; |
| } |
| |
| protected ReadListener newReadListener(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider) |
| { |
| return new StreamReader(request, response, proxyRequest, provider); |
| } |
| |
| @Override |
| protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback) |
| { |
| try |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length); |
| StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE); |
| if (writeListener == null) |
| { |
| writeListener = newWriteListener(request, proxyResponse); |
| request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener); |
| |
| // Set the data to write before calling setWriteListener(), because |
| // setWriteListener() may trigger the call to onWritePossible() on |
| // a different thread and we would have a race. |
| writeListener.data(buffer, offset, length, callback); |
| |
| // Setting the WriteListener triggers an invocation to onWritePossible(). |
| response.getOutputStream().setWriteListener(writeListener); |
| } |
| else |
| { |
| writeListener.data(buffer, offset, length, callback); |
| writeListener.onWritePossible(); |
| } |
| } |
| catch (Throwable x) |
| { |
| callback.failed(x); |
| proxyResponse.abort(x); |
| } |
| } |
| |
| protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse) |
| { |
| return new StreamWriter(request, proxyResponse); |
| } |
| |
| /** |
| * <p>Convenience extension of {@link AsyncProxyServlet} that offers transparent proxy functionalities.</p> |
| * |
| * @see org.eclipse.jetty.proxy.AbstractProxyServlet.TransparentDelegate |
| */ |
| public static class Transparent extends AsyncProxyServlet |
| { |
| private final TransparentDelegate delegate = new TransparentDelegate(this); |
| |
| @Override |
| public void init(ServletConfig config) throws ServletException |
| { |
| super.init(config); |
| delegate.init(config); |
| } |
| |
| @Override |
| protected String rewriteTarget(HttpServletRequest clientRequest) |
| { |
| return delegate.rewriteTarget(clientRequest); |
| } |
| } |
| |
| protected class StreamReader extends IteratingCallback implements ReadListener |
| { |
| private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()]; |
| private final HttpServletRequest request; |
| private final HttpServletResponse response; |
| private final Request proxyRequest; |
| private final DeferredContentProvider provider; |
| |
| protected StreamReader(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider) |
| { |
| this.request = request; |
| this.response = response; |
| this.proxyRequest = proxyRequest; |
| this.provider = provider; |
| } |
| |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| iterate(); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} proxying content to upstream completed", getRequestId(request)); |
| provider.close(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| onClientRequestFailure(request, proxyRequest, response, t); |
| } |
| |
| @Override |
| protected Action process() throws Exception |
| { |
| int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0; |
| ServletInputStream input = request.getInputStream(); |
| |
| // First check for isReady() because it has |
| // side effects, and then for isFinished(). |
| while (input.isReady() && !input.isFinished()) |
| { |
| int read = input.read(buffer); |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input); |
| if (read > 0) |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} proxying content to upstream: {} bytes", requestId, read); |
| onRequestContent(request, proxyRequest, provider, buffer, 0, read, this); |
| return Action.SCHEDULED; |
| } |
| } |
| |
| if (input.isFinished()) |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous read complete on {}", requestId, input); |
| return Action.SUCCEEDED; |
| } |
| else |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous read pending on {}", requestId, input); |
| return Action.IDLE; |
| } |
| } |
| |
| protected void onRequestContent(HttpServletRequest request, Request proxyRequest, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback) |
| { |
| provider.offer(ByteBuffer.wrap(buffer, offset, length), callback); |
| } |
| |
| @Override |
| public void failed(Throwable x) |
| { |
| super.failed(x); |
| onError(x); |
| } |
| } |
| |
| protected class StreamWriter implements WriteListener |
| { |
| private final HttpServletRequest request; |
| private final Response proxyResponse; |
| private WriteState state; |
| private byte[] buffer; |
| private int offset; |
| private int length; |
| private Callback callback; |
| |
| protected StreamWriter(HttpServletRequest request, Response proxyResponse) |
| { |
| this.request = request; |
| this.proxyResponse = proxyResponse; |
| this.state = WriteState.IDLE; |
| } |
| |
| protected void data(byte[] bytes, int offset, int length, Callback callback) |
| { |
| if (state != WriteState.IDLE) |
| throw new WritePendingException(); |
| this.state = WriteState.READY; |
| this.buffer = bytes; |
| this.offset = offset; |
| this.length = length; |
| this.callback = callback; |
| } |
| |
| @Override |
| public void onWritePossible() throws IOException |
| { |
| int requestId = getRequestId(request); |
| ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream(); |
| if (state == WriteState.READY) |
| { |
| // There is data to write. |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output); |
| output.write(buffer, offset, length); |
| state = WriteState.PENDING; |
| if (output.isReady()) |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output); |
| complete(); |
| } |
| else |
| { |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output); |
| } |
| } |
| else if (state == WriteState.PENDING) |
| { |
| // The write blocked but is now complete. |
| if (_log.isDebugEnabled()) |
| _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output); |
| complete(); |
| } |
| else |
| { |
| throw new IllegalStateException(); |
| } |
| } |
| |
| protected void complete() |
| { |
| buffer = null; |
| offset = 0; |
| length = 0; |
| Callback c = callback; |
| callback = null; |
| state = WriteState.IDLE; |
| // Call the callback only after the whole state has been reset, |
| // because the callback may trigger a reentrant call and |
| // the state must already be the new one that we reset here. |
| c.succeeded(); |
| } |
| |
| @Override |
| public void onError(Throwable failure) |
| { |
| proxyResponse.abort(failure); |
| } |
| } |
| |
| private enum WriteState |
| { |
| READY, PENDING, IDLE |
| } |
| } |