blob: 9ec3dc904699a587c175c657014c3a1d44618541 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertThat;
public class HttpClientLoadTest extends AbstractTest
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
private final AtomicLong connectionLeaks = new AtomicLong();
public HttpClientLoadTest(Transport transport)
{
super(transport);
}
@Override
protected ServerConnector newServerConnector(Server server)
{
int cores = Runtime.getRuntime().availableProcessors();
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
return new ServerConnector(server, null, null, byteBufferPool,
1, Math.min(1, cores / 2), provideServerConnectionFactory(transport));
}
@Override
protected HttpClientTransport provideClientTransport(Transport transport)
{
switch (transport)
{
case HTTP:
case HTTPS:
{
return new HttpClientTransportOverHTTP(1)
{
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
}
};
}
case FCGI:
{
return new HttpClientTransportOverFCGI(1, false, "")
{
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverFCGI(getHttpClient(), origin)
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
}
};
}
default:
{
return super.provideClientTransport(transport);
}
}
}
@Test
public void testIterative() throws Exception
{
start(new LoadHandler());
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
Random random = new Random();
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
int runs = 1;
int iterations = 500;
for (int i = 0; i < runs; ++i)
{
run(random, iterations);
}
// Re-run after warmup
iterations = 5_000;
for (int i = 0; i < runs; ++i)
{
run(random, iterations);
}
System.gc();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
{
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
byteBufferPool = client.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
{
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
}
assertThat("Connection Leaks", connectionLeaks.get(), Matchers.is(0L));
}
@Test
public void testConcurrent() throws Exception
{
start(new LoadHandler());
Random random = new Random();
int runs = 1;
int iterations = 256;
IntStream.range(0, 16).parallel().forEach(i ->
IntStream.range(0, runs).forEach(j ->
run(random, iterations)));
}
private void run(Random random, int iterations)
{
CountDownLatch latch = new CountDownLatch(iterations);
List<String> failures = new ArrayList<>();
int factor = (logger.isDebugEnabled() ? 25 : 1) * 100;
// Dumps the state of the client if the test takes too long
final Thread testThread = Thread.currentThread();
Scheduler.Task task = client.getScheduler().schedule(() ->
{
logger.warn("Interrupting test, it is taking too long");
logger.warn(client.dump());
testThread.interrupt();
}, iterations * factor, TimeUnit.MILLISECONDS);
long begin = System.nanoTime();
for (int i = 0; i < iterations; ++i)
{
test(random, latch, failures);
// test("http", "localhost", "GET", false, false, 64 * 1024, false, latch, failures);
}
Assert.assertTrue(await(latch, iterations, TimeUnit.SECONDS));
long end = System.nanoTime();
task.cancel();
long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
for (String failure : failures)
System.err.println("FAILED: "+failure);
Assert.assertTrue(failures.toString(), failures.isEmpty());
}
private void test(Random random, final CountDownLatch latch, final List<String> failures)
{
// Choose a random destination
String host = random.nextBoolean() ? "localhost" : "127.0.0.1";
// Choose a random method
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
boolean ssl = isTransportSecure();
// Choose randomly whether to close the connection on the client or on the server
boolean clientClose = false;
if (!ssl && random.nextBoolean())
clientClose = true;
boolean serverClose = false;
if (!ssl && random.nextBoolean())
serverClose = true;
int maxContentLength = 64 * 1024;
int contentLength = random.nextInt(maxContentLength) + 1;
test(ssl ? "https" : "http", host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
}
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
{
Request request = client.newRequest(host, connector.getLocalPort())
.scheme(scheme)
.method(method);
if (clientClose)
request.header(HttpHeader.CONNECTION, "close");
else if (serverClose)
request.header("X-Close", "true");
switch (method)
{
case "GET":
request.header("X-Download", String.valueOf(contentLength));
break;
case "POST":
request.header("X-Upload", String.valueOf(contentLength));
request.content(new BytesContentProvider(new byte[contentLength]));
break;
}
final CountDownLatch requestLatch = new CountDownLatch(1);
request.send(new Response.Listener.Adapter()
{
private final AtomicInteger contentLength = new AtomicInteger();
@Override
public void onHeaders(Response response)
{
if (checkContentLength)
{
String content = response.getHeaders().get("X-Content");
if (content != null)
contentLength.set(Integer.parseInt(content));
}
}
@Override
public void onContent(Response response, ByteBuffer content)
{
if (checkContentLength)
contentLength.addAndGet(-content.remaining());
}
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
result.getFailure().printStackTrace();
failures.add("Result failed " + result);
}
if (checkContentLength && contentLength.get() != 0)
failures.add("Content length mismatch " + contentLength);
requestLatch.countDown();
latch.countDown();
}
});
await(requestLatch, 5, TimeUnit.SECONDS);
}
private boolean await(CountDownLatch latch, long time, TimeUnit unit)
{
try
{
return latch.await(time, unit);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
private class LoadHandler extends AbstractHandler
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
String method = request.getMethod().toUpperCase(Locale.ENGLISH);
switch (method)
{
case "GET":
{
int contentLength = request.getIntHeader("X-Download");
if (contentLength > 0)
{
response.setHeader("X-Content", String.valueOf(contentLength));
response.getOutputStream().write(new byte[contentLength]);
}
break;
}
case "POST":
{
response.setHeader("X-Content", request.getHeader("X-Upload"));
IO.copy(request.getInputStream(), response.getOutputStream());
break;
}
}
if (Boolean.parseBoolean(request.getHeader("X-Close")))
response.setHeader("Connection", "close");
baseRequest.setHandled(true);
}
}
}