blob: 4aea456412543a4997d3b766ee44e4e8b3e8f666 [file] [log] [blame]
/*
* Copyright (c) 2010-2012 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Andre Dietisheim - initial API and implementation
* Eike Stepper - maintenance
*/
package org.eclipse.net4j.util.tests;
import org.eclipse.net4j.util.concurrent.ExecutorWorkSerializer;
import org.eclipse.net4j.util.io.IOUtil;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A test for {@link ExecutorWorkSerializer}.
*
* @author Andre Dietisheim
*/
public class ExecutorWorkSerializerTest extends AbstractOMTest
{
/** timeout to wait for execution of all work units. */
private static final int WORK_COMPLETION_TIMEOUT = 10000;
/** number of work producer threads. */
private static final int NUM_WORKPRODUCER_THREADS = 10;
/** number of working units to execute. */
private static final int NUM_WORK = 40;
/** the latch to wait on for the execution of all working units. */
private CountDownLatch workConsumedLatch;
/** The number of working units created. */
private AtomicInteger workProduced;
/** The thread pool to execute the work unit producers in. */
private ExecutorService threadPool;
/** The queue worker to submit the work units to. */
private ExecutorWorkSerializer queueWorker;
@Override
public void setUp()
{
workProduced = new AtomicInteger(0);
workConsumedLatch = new CountDownLatch(NUM_WORK);
threadPool = Executors.newFixedThreadPool(NUM_WORKPRODUCER_THREADS);
queueWorker = new ExecutorWorkSerializer(threadPool);
queueWorker.activate();
}
@Override
public void tearDown()
{
queueWorker.dispose();
threadPool.shutdown();
}
/**
* Test that asserts that all submitted workers are executed
*/
public void testAllWorkSubmittedIsConsumed() throws Throwable
{
createWorkProducerThreads(new WorkProducerFactory()
{
public WorkProducer createWorkProducer()
{
return new WorkProducer()
{
@Override
protected Runnable createWork(int id)
{
return new Work(id);
}
};
}
});
waitForAllWorkExecuted();
assertEquals(workProduced.get(), NUM_WORK - workConsumedLatch.getCount());
}
/**
* If the workers throw Exceptions, the QueueWorker stops executing work (deactivates its working thread). Therefore
* the first work unit gets consumed, the rest is not executed any more.
*/
public void testGivenWorkExceptionInWorkAllWorkSubmittedOnlyTheFirstWorkerIsConsumed() throws Throwable
{
createWorkProducerThreads(new WorkProducerFactory()
{
public WorkProducer createWorkProducer()
{
return new WorkProducer()
{
@Override
protected Runnable createWork(int id)
{
return new Work(id)
{
@Override
public void run()
{
super.run();
throw new RuntimeException("dummy exception to simulate an error in executed workers");
}
};
}
};
}
});
waitForAllWorkExecuted();
assertEquals(NUM_WORK, workProduced.get());
}
private void waitForAllWorkExecuted() throws InterruptedException
{
if (!workConsumedLatch.await(WORK_COMPLETION_TIMEOUT, TimeUnit.MILLISECONDS))
{
IOUtil.OUT().println("timeout occured before all workers were executed");
}
}
private void createWorkProducerThreads(WorkProducerFactory factory)
{
for (int i = 0; i < NUM_WORKPRODUCER_THREADS; i++)
{
threadPool.submit(factory.createWorkProducer());
}
}
/**
* A factory that creates work units.
*/
private static interface WorkProducerFactory
{
public WorkProducer createWorkProducer();
}
/**
* A Runnable that creates work units
*/
private abstract class WorkProducer implements Runnable
{
private Random random = new Random();
/**
* Produce work: add work units to the queue worker
*/
public void run()
{
try
{
int currentWorkProduced;
while ((currentWorkProduced = workProduced.getAndIncrement()) < NUM_WORK)
{
queueWorker.addWork(createWork(currentWorkProduced));
Thread.sleep(random.nextInt(1000));
}
// correct last increment
workProduced.decrementAndGet();
IOUtil.OUT().println("work producer " + this + " stopped its production");
}
catch (InterruptedException ex)
{
return;
}
}
/**
* Creates a working unit (runnable).
*
* @param id
* the id
* @return the runnable
*/
protected abstract Runnable createWork(int id);
}
/**
* A simple work unit to be executed in the queueWorker.
*
* @author Andre Dietisheim
*/
class Work implements Runnable
{
private final int id;
private Work(int id)
{
this.id = id;
IOUtil.OUT().println("work unit " + id + " created");
}
public void run()
{
workConsumedLatch.countDown();
IOUtil.OUT().println("work unit " + id + " consumed");
}
}
}