blob: aa1b0453170b75af288c910e973556933c69e6a5 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class ThreadStarvationTest
{
@Rule
public TestTracker tracker = new TestTracker();
private Server _server;
@After
public void dispose() throws Exception
{
if (_server != null)
_server.stop();
}
@Test
@Slow
public void testDefaultServletSuccess() throws Exception
{
int maxThreads = 10;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
Arrays.fill(chunk,(byte)'X');
chunk[chunk.length-2]='\r';
chunk[chunk.length-1]='\n';
for (int i = 0; i < 256 * 1024; ++i)
output.write(chunk);
}
final CountDownLatch writePending = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
writePending.countDown();
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(_server, "/");
context.setResourceBase(directory.toURI().toString());
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET /" + resourceName + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
long expected = Files.size(resourcePath);
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Long>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Long> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
long total=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
while (total<expected)
{
int read=input.read(buffer);
if (read<0)
break;
total+=read;
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
x.exchange(total);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Long> x : totals)
{
Long total = x.exchange(-1L,10000,TimeUnit.SECONDS);
Assert.assertEquals(expected,total.longValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
}
@Test
public void testFailureStarvation() throws Exception
{
try
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);
int acceptors = 0;
int selectors = 1;
int maxThreads = 10;
final int barried=maxThreads-acceptors-selectors;
final CyclicBarrier barrier = new CyclicBarrier(barried);
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
@Override
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
super.flush(buffers[0]);
throw new IOException("TEST FAILURE");
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
final AtomicInteger count = new AtomicInteger(0);
_server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
int c=count.getAndIncrement();
try
{
if (c<barried)
{
barrier.await(10,TimeUnit.SECONDS);
}
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e)
{
throw new ServletException(e);
}
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(13);
response.getWriter().print("Hello World!\n");
response.getWriter().flush();
}
});
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
}
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Integer>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Integer> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
int read=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
read=input.read(buffer);
}
catch (IOException e)
{
// e.printStackTrace();
}
finally
{
try
{
x.exchange(read);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Integer> x : totals)
{
Integer read = x.exchange(-1,10,TimeUnit.SECONDS);
Assert.assertEquals(-1,read.intValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
_server.stop();
}
finally
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(false);
}
}
}