blob: bf9af834d94831cc03a14301f9fae2405454f077 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.AbstractHttpClientServerTest;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.EmptyServerHandler;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
public HttpDestinationOverHTTPTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Before
public void init() throws Exception
{
start(new EmptyServerHandler());
}
@Test
public void test_FirstAcquire_WithEmptyQueue() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = timedPoll(connectionPool.getIdleConnections(), 5, TimeUnit.SECONDS);
}
Assert.assertNotNull(connection);
}
@Test
public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
Connection connection2 = connectionPool.acquire();
Assert.assertSame(connection1, connection2);
}
}
@Test
public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception
{
final CountDownLatch idleLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void onCreated(Connection connection)
{
try
{
idleLatch.countDown();
latch.await(5, TimeUnit.SECONDS);
super.onCreated(connection);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
};
}
};
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
// Make sure we entered idleCreated().
Assert.assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
// There are no available existing connections, so acquire()
// returns null because we delayed idleCreated() above
Assert.assertNull(connection1);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire();
Assert.assertNull(connection2);
latch.countDown();
// There must be 2 idle connections.
Queue<Connection> idleConnections = connectionPool.getIdleConnections();
Connection connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
}
@Test
public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
HttpConnectionOverHTTP connection1 = (HttpConnectionOverHTTP)connectionPool.acquire();
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = (HttpConnectionOverHTTP)connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
// Acquire the connection to make it active
Assert.assertSame(connection1, connectionPool.acquire());
destination.process(connection1);
destination.release(connection1);
Connection connection2 = connectionPool.acquire();
Assert.assertSame(connection1, connection2);
}
@Test
public void test_IdleConnection_IdleTimeout() throws Exception
{
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = connectionPool.getIdleConnections().poll();
Assert.assertNull(connection1);
}
}
@Test
public void test_Request_Failed_If_MaxRequestsQueuedPerDestination_Exceeded() throws Exception
{
int maxQueued = 1;
client.setMaxRequestsQueuedPerDestination(maxQueued);
client.setMaxConnectionsPerDestination(1);
// Make one request to open the connection and be sure everything is setup properly
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
// Send another request that is sent immediately
final CountDownLatch successLatch = new CountDownLatch(1);
final CountDownLatch failureLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
.onRequestQueued(request ->
{
// This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two")
.send(result ->
{
Assert.assertTrue(result.isFailed());
Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
failureLatch.countDown();
});
})
.send(result ->
{
if (result.isSucceeded())
successLatch.countDown();
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testDestinationIsRemoved() throws Exception
{
String host = "localhost";
int port = connector.getLocalPort();
Destination destinationBefore = client.getDestination(scheme, host, port);
ContentResponse response = client.newRequest(host, port)
.scheme(scheme)
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.send();
Assert.assertEquals(200, response.getStatus());
Destination destinationAfter = client.getDestination(scheme, host, port);
Assert.assertSame(destinationBefore, destinationAfter);
client.setRemoveIdleDestinations(true);
response = client.newRequest(host, port)
.scheme(scheme)
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.send();
Assert.assertEquals(200, response.getStatus());
destinationAfter = client.getDestination(scheme, host, port);
Assert.assertNotSame(destinationBefore, destinationAfter);
}
private Connection timedPoll(Queue<Connection> connections, long time, TimeUnit unit) throws InterruptedException
{
long start = System.nanoTime();
while (unit.toNanos(time) > System.nanoTime() - start)
{
Connection connection = connections.poll();
if (connection != null)
return connection;
TimeUnit.MILLISECONDS.sleep(5);
}
return connections.poll();
}
}