| /* |
| * Copyright (c) 2015, 2019, 2020, 2022 Eike Stepper (Loehne, Germany) and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Eike Stepper - initial API and implementation |
| */ |
| package org.eclipse.net4j.util.concurrent; |
| |
| import org.eclipse.net4j.internal.util.bundle.OM; |
| import org.eclipse.net4j.util.StringUtil; |
| import org.eclipse.net4j.util.om.OMPlatform; |
| |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.Method; |
| import java.util.AbstractQueue; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionHandler; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| * @author Eike Stepper |
| * @since 3.6 |
| */ |
| public class ThreadPool extends ThreadPoolExecutor implements RejectedExecutionHandler |
| { |
| public static final String DEFAULT_THREAD_GROUP_NAME = ExecutorServiceFactory.DEFAULT_THREAD_GROUP_NAME; |
| |
| public static final int DEFAULT_CORE_POOL_SIZE = 10; |
| |
| public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE; |
| |
| public static final long DEFAULT_KEEP_ALIVE_SECONDS = 60; |
| |
| private static final Class<?> LINKED_BLOCKING_DEQUE_CLASS; |
| |
| private static final Method OFFER_LAST_METHOD; |
| |
| private static final int NO_DEADLOCK_DETECTION = 0; |
| |
| private static final int deadlockDetectionInterval = OMPlatform.INSTANCE.getProperty("org.eclipse.net4j.util.concurrent.ThreadPool.deadlockDetectionInterval", |
| NO_DEADLOCK_DETECTION); |
| |
| private final AtomicInteger runningTasks = new AtomicInteger(); |
| |
| private final AtomicInteger runTasks = new AtomicInteger(); |
| |
| private int lastRunTasks = -1; |
| |
| private RejectedExecutionHandler userHandler; |
| |
| public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveSeconds, ThreadFactory threadFactory) |
| { |
| super(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, createWorkQueue(), threadFactory); |
| ((WorkQueue)getQueue()).setThreadPool(this); |
| |
| // Call super setter because the setter in this class is overridden to set the userHandler field. |
| super.setRejectedExecutionHandler(this); |
| |
| if (deadlockDetectionInterval != NO_DEADLOCK_DETECTION) |
| { |
| DeadlockDetector.INSTANCE.register(this); |
| } |
| } |
| |
| @Override |
| public void setRejectedExecutionHandler(RejectedExecutionHandler handler) |
| { |
| userHandler = handler; |
| } |
| |
| @Override |
| public RejectedExecutionHandler getRejectedExecutionHandler() |
| { |
| return userHandler; |
| } |
| |
| @Override |
| public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) |
| { |
| WorkQueue queue = (WorkQueue)getQueue(); |
| if (!queue.offerLast(task)) |
| { |
| if (userHandler != null) |
| { |
| userHandler.rejectedExecution(task, this); |
| } |
| else |
| { |
| OM.LOG.error("Thread pool has rejected the task " + task); |
| } |
| } |
| } |
| |
| @Override |
| public int getActiveCount() |
| { |
| return runningTasks.get(); |
| } |
| |
| @Override |
| protected void beforeExecute(Thread worker, Runnable task) |
| { |
| runningTasks.incrementAndGet(); |
| incrementRunTasks(); |
| } |
| |
| @Override |
| protected void afterExecute(Runnable task, Throwable ex) |
| { |
| runningTasks.decrementAndGet(); |
| } |
| |
| /** |
| * @since 3.9 |
| */ |
| protected void potentialDeadlockDetected() |
| { |
| BlockingQueue<Runnable> queue = getQueue(); |
| int size = queue.size(); |
| if (size > 0) |
| { |
| String poolName = toString(); |
| |
| ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>()); |
| Runnable task; |
| boolean first = true; |
| |
| while ((task = queue.poll()) != null) |
| { |
| if (first) |
| { |
| OM.LOG.warn("Potential deadlock detected in " + poolName + ". Executing " + size + " tasks..."); |
| first = false; |
| } |
| |
| incrementRunTasks(); |
| executor.execute(task); |
| } |
| } |
| } |
| |
| private void incrementRunTasks() |
| { |
| int current; |
| int next; |
| |
| do |
| { |
| current = runTasks.get(); |
| next = current == Integer.MAX_VALUE ? 0 : current + 1; |
| } while (!runTasks.compareAndSet(current, next)); |
| } |
| |
| /** |
| * This method decides whether a new task will be added to the {@link WorkQueue} (and eventually picked up by |
| * an existing worker), or assigned to a new worker. |
| * <p> |
| * It is called from {@link WorkQueue#offer(Runnable)}, which, in turn, is called from {@link #execute(Runnable)}. |
| * When this method is called the core workers are already created, i.e., {@link #getPoolSize() pool size} >= |
| * {@link #getCorePoolSize() core pool size}. |
| * <p> |
| * Note that, due to the unsynchronized calls to the various metric-providing methods, |
| * it can happen that the thread pool will not be able to actually create a new worker at the time it is supposed |
| * to do it. In this case the {@link #rejectedExecution(Runnable, ThreadPoolExecutor) rejectedExecution()} method |
| * will be called, which, as a last resort, adds the new task to the work queue (even though here |
| * it was decided not to do so). |
| */ |
| private boolean shallEnqueue() |
| { |
| int poolSize = getPoolSize(); |
| if (getQueue().size() < poolSize - getActiveCount()) |
| { |
| // More inactive workers exist than there are tasks in the queue; the task should be enqueued. |
| return true; |
| } |
| |
| if (poolSize >= getMaximumPoolSize()) |
| { |
| // Pool is full; the task should be enqueued. |
| return true; |
| } |
| |
| // A new worker should be created. |
| return false; |
| } |
| |
| public static ThreadPool create() |
| { |
| return create(null, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_SECONDS); |
| } |
| |
| public static ThreadPool create(String description) |
| { |
| String threadGroupName = null; |
| int corePoolSize = DEFAULT_CORE_POOL_SIZE; |
| int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; |
| long keepAliveSeconds = DEFAULT_KEEP_ALIVE_SECONDS; |
| |
| if (!StringUtil.isEmpty(description)) |
| { |
| String[] tokens = description.split(":"); |
| if (tokens.length > 0) |
| { |
| threadGroupName = tokens[0]; |
| if (tokens.length > 1) |
| { |
| try |
| { |
| corePoolSize = Integer.parseInt(tokens[1]); |
| } |
| catch (NumberFormatException ex) |
| { |
| //$FALL-THROUGH$ |
| } |
| |
| if (tokens.length > 2) |
| { |
| try |
| { |
| maximumPoolSize = Integer.parseInt(tokens[2]); |
| } |
| catch (NumberFormatException ex) |
| { |
| //$FALL-THROUGH$ |
| } |
| |
| if (tokens.length > 3) |
| { |
| try |
| { |
| keepAliveSeconds = Long.parseLong(tokens[3]); |
| } |
| catch (NumberFormatException ex) |
| { |
| //$FALL-THROUGH$ |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| return create(threadGroupName, corePoolSize, maximumPoolSize, keepAliveSeconds); |
| } |
| |
| public static ThreadPool create(String threadGroupName, int corePoolSize, int maximumPoolSize, long keepAliveSeconds) |
| { |
| ThreadFactory threadFactory = createThreadFactory(threadGroupName); |
| return new ThreadPool(corePoolSize, maximumPoolSize, keepAliveSeconds, threadFactory); |
| } |
| |
| private static ThreadFactory createThreadFactory(String threadGroupName) |
| { |
| if (threadGroupName == null) |
| { |
| threadGroupName = DEFAULT_THREAD_GROUP_NAME; |
| } |
| |
| ThreadGroup threadGroup = new ThreadGroup(threadGroupName); |
| |
| ThreadFactory threadFactory = new ThreadFactory() |
| { |
| private final AtomicInteger num = new AtomicInteger(); |
| |
| @Override |
| public Thread newThread(Runnable task) |
| { |
| Thread thread = new Thread(threadGroup, task, threadGroup.getName() + "-thread-" + num.incrementAndGet()); |
| thread.setDaemon(true); |
| return thread; |
| } |
| }; |
| |
| return threadFactory; |
| } |
| |
| private static WorkQueue createWorkQueue() |
| { |
| if (LINKED_BLOCKING_DEQUE_CLASS != null) |
| { |
| try |
| { |
| return new WorkQueueJRE16(); |
| } |
| catch (Throwable ex) |
| { |
| //$FALL-THROUGH$ |
| } |
| } |
| |
| return new WorkQueueJRE15(); |
| } |
| |
| static |
| { |
| Class<?> c = null; |
| Method m = null; |
| |
| try |
| { |
| c = Class.forName("java.util.concurrent.LinkedBlockingDeque"); |
| m = c.getMethod("offerLast", Object.class); |
| } |
| catch (Throwable ex) |
| { |
| c = null; |
| m = null; |
| } |
| |
| LINKED_BLOCKING_DEQUE_CLASS = c; |
| OFFER_LAST_METHOD = m; |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| private interface WorkQueue extends BlockingQueue<Runnable> |
| { |
| public void setThreadPool(ThreadPool threadPool); |
| |
| public boolean offerLast(Runnable task); |
| |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| private static final class WorkQueueJRE15 extends LinkedBlockingQueue<Runnable> implements WorkQueue |
| { |
| private static final long serialVersionUID = 1L; |
| |
| private ThreadPool threadPool; |
| |
| public WorkQueueJRE15() |
| { |
| } |
| |
| @Override |
| public void setThreadPool(ThreadPool threadPool) |
| { |
| this.threadPool = threadPool; |
| } |
| |
| @Override |
| public boolean offerLast(Runnable task) |
| { |
| // Call the super method because the method in this class is overridden. |
| return super.offer(task); |
| } |
| |
| @Override |
| public boolean offer(Runnable task) |
| { |
| if (threadPool.shallEnqueue()) |
| { |
| return super.offer(task); |
| } |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| private static final class WorkQueueJRE16 extends AbstractQueue<Runnable> implements WorkQueue |
| { |
| private final BlockingQueue<Runnable> delegate = createDelegate(); |
| |
| private ThreadPool threadPool; |
| |
| public WorkQueueJRE16() |
| { |
| } |
| |
| @Override |
| public void setThreadPool(ThreadPool threadPool) |
| { |
| this.threadPool = threadPool; |
| } |
| |
| @Override |
| public boolean offerLast(Runnable task) |
| { |
| try |
| { |
| // Call the LinkedBlockingDeque.offerLast() method because it does NOT call |
| // the overridden offer() method in this class. |
| return (Boolean)OFFER_LAST_METHOD.invoke(delegate, task); |
| } |
| catch (Throwable ex) |
| { |
| return false; |
| } |
| } |
| |
| @Override |
| public boolean offer(Runnable task) |
| { |
| if (threadPool.shallEnqueue()) |
| { |
| return delegate.offer(task); |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public boolean offer(Runnable taske, long timeout, TimeUnit unit) throws InterruptedException |
| { |
| return delegate.offer(taske, timeout, unit); |
| } |
| |
| @Override |
| public int size() |
| { |
| return delegate.size(); |
| } |
| |
| @Override |
| public Runnable take() throws InterruptedException |
| { |
| return delegate.take(); |
| } |
| |
| @Override |
| public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException |
| { |
| return delegate.poll(timeout, unit); |
| } |
| |
| @Override |
| public Runnable poll() |
| { |
| return delegate.poll(); |
| } |
| |
| @Override |
| public Iterator<Runnable> iterator() |
| { |
| return delegate.iterator(); |
| } |
| |
| @Override |
| public Runnable peek() |
| { |
| return delegate.peek(); |
| } |
| |
| @Override |
| public void put(Runnable task) throws InterruptedException |
| { |
| delegate.put(task); |
| } |
| |
| @Override |
| public int remainingCapacity() |
| { |
| return delegate.remainingCapacity(); |
| } |
| |
| @Override |
| public int drainTo(Collection<? super Runnable> c) |
| { |
| return delegate.drainTo(c); |
| } |
| |
| @Override |
| public int drainTo(Collection<? super Runnable> c, int maxElements) |
| { |
| return delegate.drainTo(c, maxElements); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static BlockingQueue<Runnable> createDelegate() |
| { |
| try |
| { |
| Constructor<?> constructor = LINKED_BLOCKING_DEQUE_CLASS.getConstructor(); |
| return (BlockingQueue<Runnable>)constructor.newInstance(); |
| } |
| catch (Throwable ex) |
| { |
| //$FALL-THROUGH$ |
| } |
| |
| return new LinkedBlockingQueue<>(); |
| } |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| private static final class DeadlockDetector extends Worker |
| { |
| public static final DeadlockDetector INSTANCE = new DeadlockDetector(); |
| |
| private volatile ArrayList<ThreadPool> pools = new ArrayList<>(); |
| |
| private DeadlockDetector() |
| { |
| setDaemon(true); |
| activate(); |
| } |
| |
| public void register(ThreadPool pool) |
| { |
| ArrayList<ThreadPool> newList = new ArrayList<>(pools); |
| newList.add(pool); |
| pools = newList; |
| } |
| |
| private void unregister(ThreadPool pool) |
| { |
| ArrayList<ThreadPool> newList = new ArrayList<>(pools); |
| newList.remove(pool); |
| pools = newList; |
| } |
| |
| @Override |
| protected String getThreadName() |
| { |
| return DeadlockDetector.class.getSimpleName(); |
| } |
| |
| @Override |
| protected void work(WorkContext context) throws Exception |
| { |
| ArrayList<ThreadPool> list = pools; |
| int size = list.size(); |
| |
| for (int i = 0; i < size; i++) |
| { |
| ThreadPool pool = list.get(i); |
| if (pool.isShutdown()) |
| { |
| unregister(pool); |
| continue; |
| } |
| |
| work(pool); |
| } |
| |
| context.nextWork(deadlockDetectionInterval); |
| } |
| |
| private void work(ThreadPool pool) |
| { |
| int lastRunTasks = pool.runTasks.get(); |
| if (lastRunTasks != pool.lastRunTasks) |
| { |
| pool.lastRunTasks = lastRunTasks; |
| } |
| else |
| { |
| if (pool.getPoolSize() == pool.getMaximumPoolSize()) |
| { |
| pool.potentialDeadlockDetected(); |
| } |
| } |
| } |
| } |
| } |