blob: dbe4faee4133164917ad34d8e8c108cc34aaf4e3 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client.io;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.SessionFactory;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
/**
* This is the initial connection handling that exists immediately after physical connection is established to
* destination server.
* <p>
* Eventually, upon successful Upgrade request/response, this connection swaps itself out for the
* WebSocektClientConnection handler.
*/
public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom
{
public class SendUpgradeRequest extends FutureCallback implements Runnable
{
private final Logger LOG = Log.getLogger(UpgradeConnection.SendUpgradeRequest.class);
@Override
public void run()
{
URI uri = connectPromise.getRequest().getRequestURI();
request.setRequestURI(uri);
UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
if (handshakeListener != null)
{
handshakeListener.onHandshakeRequest(request);
}
String rawRequest = request.generate();
ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8);
getEndPoint().write(this,buf);
}
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
{
LOG.debug("Upgrade Request Write Success");
}
// Writing the request header is complete.
super.succeeded();
state = State.RESPONSE;
// start the interest in fill
fillInterested();
}
@Override
public void failed(Throwable cause)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Upgrade Request Write Failure",cause);
}
super.failed(cause);
state = State.FAILURE;
// Fail the connect promise when a fundamental exception during connect occurs.
connectPromise.failed(cause);
}
}
/** HTTP Response Code: 101 Switching Protocols */
private static final int SWITCHING_PROTOCOLS = 101;
private enum State
{
REQUEST,
RESPONSE,
FAILURE,
UPGRADE
}
private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
private final ByteBufferPool bufferPool;
private final ConnectPromise connectPromise;
private final HttpResponseHeaderParser parser;
private State state = State.REQUEST;
private ClientUpgradeRequest request;
private ClientUpgradeResponse response;
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor);
this.connectPromise = connectPromise;
this.bufferPool = connectPromise.getClient().getBufferPool();
this.request = connectPromise.getRequest();
// Setup the parser
this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
}
public void disconnect(boolean onlyOutput)
{
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
if (LOG.isDebugEnabled())
{
LOG.debug("Shutting down output {}",endPoint);
}
endPoint.shutdownOutput();
if (!onlyOutput)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Closing {}",endPoint);
}
endPoint.close();
}
}
private void failUpgrade(Throwable cause)
{
close();
connectPromise.failed(cause);
}
private void notifyConnect(ClientUpgradeResponse response)
{
connectPromise.setResponse(response);
UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
if (handshakeListener != null)
{
handshakeListener.onHandshakeResponse(response);
}
}
@Override
public ByteBuffer onUpgradeFrom()
{
return connectPromise.getResponse().getRemainingBuffer();
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
{
LOG.debug("onFillable");
}
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
try
{
read(buffer);
}
finally
{
bufferPool.release(buffer);
}
if (state == State.RESPONSE)
{
// Continue Reading
fillInterested();
}
else if (state == State.UPGRADE)
{
// Stop Reading, upgrade the connection now
upgradeConnection(response);
}
}
@Override
public void onOpen()
{
super.onOpen();
getExecutor().execute(new SendUpgradeRequest());
}
@Override
public void onClose()
{
if (LOG.isDebugEnabled())
{
LOG.debug("Closed connection {}",this);
}
super.onClose();
}
@Override
protected boolean onReadTimeout()
{
if (LOG.isDebugEnabled())
{
LOG.debug("Timeout on connection {}",this);
}
failUpgrade(new IOException("Timeout while performing WebSocket Upgrade"));
return super.onReadTimeout();
}
/**
* Read / Parse the waiting read/fill buffer
*
* @param buffer
* the buffer to fill into from the endpoint
*/
private void read(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
try
{
while (true)
{
int filled = endPoint.fill(buffer);
if (filled == 0)
{
return;
}
else if (filled < 0)
{
LOG.warn("read - EOF Reached");
state = State.FAILURE;
failUpgrade(new EOFException("Reading WebSocket Upgrade response"));
return;
}
else
{
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
response = (ClientUpgradeResponse)parser.parse(buffer);
if (response != null)
{
// Got a response!
validateResponse(response);
notifyConnect(response);
state = State.UPGRADE;
return; // do no more reading
}
}
}
}
catch (IOException | ParseException e)
{
LOG.ignore(e);
state = State.FAILURE;
UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
connectPromise.failed(ue);
disconnect(false);
}
catch (UpgradeException e)
{
LOG.ignore(e);
state = State.FAILURE;
connectPromise.failed(e);
disconnect(false);
}
}
private void upgradeConnection(ClientUpgradeResponse response)
{
EndPoint endp = getEndPoint();
Executor executor = getExecutor();
EventDriver websocket = connectPromise.getDriver();
WebSocketPolicy policy = websocket.getPolicy();
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
session.setPolicy(policy);
session.setUpgradeRequest(request);
session.setUpgradeResponse(response);
connection.addListener(session);
connectPromise.setSession(session);
// Initialize / Negotiate Extensions
ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
extensionStack.negotiate(response.getExtensions());
extensionStack.configure(connection.getParser());
extensionStack.configure(connection.getGenerator());
// Setup Incoming Routing
connection.setNextIncomingFrames(extensionStack);
extensionStack.setNextIncoming(session);
// Setup Outgoing Routing
session.setOutgoingHandler(extensionStack);
extensionStack.setNextOutgoing(connection);
session.addManaged(extensionStack);
connectPromise.getClient().addManaged(session);
// Now swap out the connection
endp.upgrade(connection);
}
private void validateResponse(ClientUpgradeResponse response)
{
// Validate Response Status Code
if (response.getStatusCode() != SWITCHING_PROTOCOLS)
{
// TODO: use jetty-http and org.eclipse.jetty.http.HttpStatus for more meaningful exception messages
throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols, expected status <" + SWITCHING_PROTOCOLS
+ ">, but got <" + response.getStatusCode() + ">");
}
// Validate Connection header
String connection = response.getHeader("Connection");
if (!"upgrade".equalsIgnoreCase(connection))
{
throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
}
// Check the Accept hash
String reqKey = request.getKey();
String expectedHash = AcceptHash.hashKey(reqKey);
String respHash = response.getHeader("Sec-WebSocket-Accept");
response.setSuccess(true);
if (expectedHash.equalsIgnoreCase(respHash) == false)
{
response.setSuccess(false);
throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
}
// Parse extensions
List<ExtensionConfig> extensions = new ArrayList<>();
List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
if (extValues != null)
{
for (String extVal : extValues)
{
QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
while (tok.hasMoreTokens())
{
extensions.add(ExtensionConfig.parse(tok.nextToken()));
}
}
}
response.setExtensions(extensions);
}
}