Bug 516609 - soft-limit number of job workers in worker pool

Change-Id: I9e5b4f027f13ce6358c7f7ee7a50fbb574b9276b
Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
Signed-off-by: Andrey Loskutov <loskutov@gmx.de>
diff --git a/bundles/org.eclipse.core.jobs/src/org/eclipse/core/internal/jobs/WorkerPool.java b/bundles/org.eclipse.core.jobs/src/org/eclipse/core/internal/jobs/WorkerPool.java
index ee38ffd..54716cf 100644
--- a/bundles/org.eclipse.core.jobs/src/org/eclipse/core/internal/jobs/WorkerPool.java
+++ b/bundles/org.eclipse.core.jobs/src/org/eclipse/core/internal/jobs/WorkerPool.java
@@ -7,6 +7,7 @@
  *
  *  Contributors:
  *     IBM - Initial API and implementation
+ *     salesforce.com - limit number of sleeping worker threads
  *******************************************************************************/
 package org.eclipse.core.internal.jobs;
 
@@ -33,6 +34,14 @@
 	 * There will always be at least MIN_THREADS workers in the pool.
 	 */
 	private static final int MIN_THREADS = 1;
+
+	/**
+	 * Soft limit on the maximum number of workers in the pool. An idle worker
+	 * is not put back in the pool if the total number of workers is more than
+	 * MAX_THREADS.
+	 */
+	private static final int MAX_THREADS = 50;
+
 	/**
 	 * Use the busy thread count to avoid starting new threads when a living
 	 * thread is just doing house cleaning (notifying listeners, etc).
@@ -199,15 +208,17 @@
 	 * Returns a new job to run. Returns null if the thread should die.
 	 */
 	protected InternalJob startJob(Worker worker) {
-		//if we're above capacity, kill the thread
+		// must endWorker and decrementBusyThreads from the same synchronized block
+		boolean busy;
 		synchronized (this) {
 			if (!manager.isActive()) {
 				//must remove the worker immediately to prevent all threads from expiring
 				endWorker(worker);
 				return null;
 			}
-			//set the thread to be busy now in case of reentrant scheduling
+			// set the thread to be busy now in case of reentrant scheduling
 			incrementBusyThreads();
+			busy = true;
 		}
 		Job job = null;
 		try {
@@ -216,8 +227,17 @@
 			long idleStart = System.currentTimeMillis();
 			while (manager.isActive() && job == null) {
 				long hint = manager.sleepHint();
-				if (hint > 0)
+				if (hint > 0) {
+					synchronized (this) {
+						if (numThreads > MAX_THREADS) {
+							endWorker(worker);
+							decrementBusyThreads();
+							busy = false;
+							return null;
+						}
+					}
 					sleep(Math.min(hint, BEST_BEFORE));
+				}
 				job = manager.startJob(worker);
 				//if we were already idle, and there are still no new jobs, then
 				// the thread can expire
@@ -225,6 +245,8 @@
 					if (job == null && (System.currentTimeMillis() - idleStart > BEST_BEFORE) && (numThreads - busyThreads) > MIN_THREADS) {
 						//must remove the worker immediately to prevent all threads from expiring
 						endWorker(worker);
+						decrementBusyThreads();
+						busy = false;
 						return null;
 					}
 				}
@@ -245,7 +267,7 @@
 			}
 		} finally {
 			//decrement busy thread count if we're not running a job
-			if (job == null)
+			if (job == null && busy)
 				decrementBusyThreads();
 		}
 		return job;
diff --git a/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/AllTests.java b/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/AllTests.java
index 22a1155..86010a3 100644
--- a/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/AllTests.java
+++ b/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/AllTests.java
@@ -45,6 +45,7 @@
 		suite.addTestSuite(Bug_316839.class);
 		suite.addTestSuite(Bug_320329.class);
 		suite.addTest(Bug_412138.suite());
+		suite.addTestSuite(WorkerPoolTest.class);
 		return suite;
 	}
 }
diff --git a/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/WorkerPoolTest.java b/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/WorkerPoolTest.java
new file mode 100644
index 0000000..b7a54e8
--- /dev/null
+++ b/tests/org.eclipse.core.tests.runtime/src/org/eclipse/core/tests/runtime/jobs/WorkerPoolTest.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Copyright (c) 2017 salesforce.com.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     salesforce.com - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.core.tests.runtime.jobs;
+
+import java.util.concurrent.*;
+import junit.framework.TestCase;
+import org.eclipse.core.internal.jobs.Worker;
+import org.eclipse.core.runtime.*;
+import org.eclipse.core.runtime.jobs.Job;
+
+public class WorkerPoolTest extends TestCase {
+
+	public void testIdleWorkerCap() throws Exception {
+		// See org.eclipse.core.internal.jobs.WorkerPool.MAX_THREADS
+		final int MAX_THREADS = 50;
+
+		// number of concurrent jobs
+		final int count = MAX_THREADS * 10;
+
+		// cyclic barrier for count worker threads + one test thread
+		final CyclicBarrier barrier = new CyclicBarrier(count + 1);
+
+		// start count concurrent jobs
+		for (int i = 0; i < count; i++) {
+			new Job("testIdleWorkerCap-" + i) {
+				@Override
+				protected IStatus run(IProgressMonitor monitor) {
+					try {
+						barrier.await();
+					} catch (InterruptedException | BrokenBarrierException e) {
+						return Status.CANCEL_STATUS;
+					}
+					return Status.OK_STATUS;
+				}
+			}.schedule();
+		}
+
+		// wait for jobs to reach the barrier
+		barrier.await(10, TimeUnit.SECONDS);
+
+		// this is the ugly part, wait until worker threads become idle
+		Thread.sleep(5 * 1000L);
+
+		// count worker threads, must be less than WorkerPool.MAX_THREADS
+		Thread[] threads = new Thread[Thread.activeCount() * 2];
+		int tcount = Thread.enumerate(threads);
+		assertTrue("Too many active threads: " + tcount, tcount < threads.length);
+		int wcount = 0;
+		for (int i = 0; i < tcount; i++) {
+			if (threads[i] instanceof Worker) {
+				wcount++;
+			}
+		}
+		assertTrue("Too many worker threads active: " + wcount + ", must be <= " + MAX_THREADS, wcount <= MAX_THREADS);
+	}
+
+}