Bug 574652 - fix IAE in "Breakpoints View Update Job"

Serialized execution of breakpoint jobs without SchedulingRule
by using a single job instance with a queue for tasks.

Change-Id: I5c987bd4def191ebb4778c0e68be60badd58a4cf
Signed-off-by: Joerg Kubitz <jkubitz-eclipse@gmx.de>
Reviewed-on: https://git.eclipse.org/r/c/platform/eclipse.platform.debug/+/182814
Tested-by: Platform Bot <platform-bot@eclipse.org>
Reviewed-by: Andrey Loskutov <loskutov@gmx.de>
diff --git a/org.eclipse.debug.tests/src/org/eclipse/debug/tests/AutomatedSuite.java b/org.eclipse.debug.tests/src/org/eclipse/debug/tests/AutomatedSuite.java
index b28236a..5476f0d 100644
--- a/org.eclipse.debug.tests/src/org/eclipse/debug/tests/AutomatedSuite.java
+++ b/org.eclipse.debug.tests/src/org/eclipse/debug/tests/AutomatedSuite.java
@@ -17,6 +17,7 @@
 
 import org.eclipse.debug.tests.breakpoint.BreakpointOrderingTests;
 import org.eclipse.debug.tests.breakpoint.BreakpointTests;
+import org.eclipse.debug.tests.breakpoint.SerialExecutorTest;
 import org.eclipse.debug.tests.console.ConsoleDocumentAdapterTests;
 import org.eclipse.debug.tests.console.ConsoleManagerTests;
 import org.eclipse.debug.tests.console.ConsoleTests;
@@ -69,6 +70,7 @@
 		// BP tests
 		BreakpointOrderingTests.class,
 		BreakpointTests.class,
+		SerialExecutorTest.class,
 		// Note: jface viewer tests were moved out of nightly tests
 		// due to frequent problems on nightly build machines.
 		// (Bug 343308).
diff --git a/org.eclipse.debug.tests/src/org/eclipse/debug/tests/breakpoint/SerialExecutorTest.java b/org.eclipse.debug.tests/src/org/eclipse/debug/tests/breakpoint/SerialExecutorTest.java
new file mode 100644
index 0000000..73e63ef
--- /dev/null
+++ b/org.eclipse.debug.tests/src/org/eclipse/debug/tests/breakpoint/SerialExecutorTest.java
@@ -0,0 +1,163 @@
+/*******************************************************************************
+ *  Copyright (c) 2021 Joerg Kubitz and others.
+ *
+ *  This program and the accompanying materials
+ *  are made available under the terms of the Eclipse Public License 2.0
+ *  which accompanies this distribution, and is available at
+ *  https://www.eclipse.org/legal/epl-2.0/
+ *
+ *  SPDX-License-Identifier: EPL-2.0
+ *
+ *  Contributors:
+ *     Joerg Kubitz - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.debug.tests.breakpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.eclipse.core.runtime.jobs.Job;
+import org.eclipse.debug.internal.ui.model.elements.SerialExecutor;
+import org.eclipse.debug.tests.AbstractDebugTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@SuppressWarnings("restriction")
+public class SerialExecutorTest extends AbstractDebugTest {
+	@Override
+	public void tearDown() throws Exception {
+		Job.getJobManager().cancel(this);
+		super.tearDown();
+	}
+
+	@Test
+	public void testSimpleExecution() throws InterruptedException {
+		SerialExecutor serialExecutor = new SerialExecutor("test", this);
+		AtomicInteger executions = new AtomicInteger(0);
+		serialExecutor.schedule(() -> executions.incrementAndGet());
+		Job.getJobManager().join(this, null);
+		assertEquals(1, executions.get());
+		serialExecutor.schedule(() -> executions.incrementAndGet());
+		Job.getJobManager().join(this, null);
+		assertEquals(2, executions.get());
+		serialExecutor.schedule(() -> executions.incrementAndGet());
+		serialExecutor.schedule(() -> executions.incrementAndGet());
+		Job.getJobManager().join(this, null);
+		assertEquals(4, executions.get());
+	}
+
+	@Test
+	public void testSerialExecution() throws InterruptedException {
+		SerialExecutor serialExecutor = new SerialExecutor("test", this);
+		AtomicInteger executions = new AtomicInteger(0);
+		AtomicInteger parallelExecutions = new AtomicInteger(0);
+		final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+		int RUNS = 20;
+		int WAIT_MILLIS = 2;
+		long start = System.nanoTime();
+		for (int i = 0; i < RUNS; i++) {
+			serialExecutor.schedule(() -> {
+				WriteLock writeLock = rwl.writeLock();
+				if (writeLock.tryLock()) {
+					try {
+						Thread.sleep(WAIT_MILLIS);
+						executions.incrementAndGet();
+					} catch (InterruptedException e) {
+						// interrupt should not happen -> fail test
+						parallelExecutions.incrementAndGet();
+					}
+					writeLock.unlock();
+				} else {
+					// another thread already holding the lock
+					// should not happen -> fail test
+					parallelExecutions.incrementAndGet();
+				}
+				Job[] jobs = Job.getJobManager().find(SerialExecutorTest.this);
+				if (jobs.length != 1) {
+					parallelExecutions.incrementAndGet();
+				}
+			});
+		}
+		Job.getJobManager().join(this, null);
+		Job[] jobs = Job.getJobManager().find(this);
+		assertEquals(0, jobs.length);
+		long stop = System.nanoTime();
+		long millis = (stop - start) / 1000_000;
+		assertEquals(RUNS, executions.get());
+		assertEquals(0, parallelExecutions.get());
+		long minimalMillis = RUNS * WAIT_MILLIS;
+		assertTrue("Test did finish too fast (" + millis + " ms)", millis >= minimalMillis);
+	}
+
+	@Test
+	public void testSchedulingQueue() throws InterruptedException {
+		// Executor has to execute every task. Even when they are faster
+		// scheduled then executed
+		SerialExecutor serialExecutor = new SerialExecutor("test", this);
+		AtomicInteger executions = new AtomicInteger();
+		int RUNS = 20;
+		int WAIT_MILLIS = 2;
+		for (int i = 0; i < RUNS; i++) {
+			serialExecutor.schedule(() -> {
+				try {
+					Thread.sleep(WAIT_MILLIS);
+					executions.incrementAndGet();
+				} catch (InterruptedException e) {
+					// error
+				}
+			});
+		}
+		Job.getJobManager().join(this, null);
+		Job[] jobs = Job.getJobManager().find(this);
+		assertEquals(0, jobs.length);
+		assertEquals(RUNS, executions.get());
+	}
+
+	@Test
+	@Ignore("See https://bugs.eclipse.org/bugs/show_bug.cgi?id=574883")
+	public void testHeavyScheduling() throws InterruptedException {
+		// Executor has to execute every task. Even when they are scheduled fast
+		// and execute fast
+		SerialExecutor serialExecutor = new SerialExecutor("test", this);
+		AtomicInteger executions = new AtomicInteger();
+		int RUNS = 200;
+		for (int i = 0; i < RUNS; i++) {
+			serialExecutor.schedule(() -> executions.incrementAndGet());
+		}
+		Job.getJobManager().join(this, null);
+		Job[] jobs = Job.getJobManager().find(this);
+		assertEquals(0, jobs.length);
+		assertEquals(RUNS, executions.get());
+	}
+
+	@Test
+	public void testJoin() throws InterruptedException {
+		// The last scheduled job has to be done before join() returns
+		for (int run = 0; run < 100; run++) {
+			SerialExecutor serialExecutor = new SerialExecutor("test", this);
+			AtomicInteger executions = new AtomicInteger();
+			int RUNS = 20;
+			int WAIT_MILLIS = 1;
+			for (int i = 0; i < RUNS; i++) {
+				serialExecutor.schedule(() -> {
+					try {
+						Thread.sleep(WAIT_MILLIS);
+						executions.incrementAndGet();
+					} catch (InterruptedException e) {
+						// error
+					}
+				});
+			}
+			Job.getJobManager().join(this, null);
+			Job[] jobs = Job.getJobManager().find(this);
+			assertEquals(0, jobs.length);
+			assertEquals("failed on run " + run, RUNS, executions.get());
+			// does fail on run ~ 40 if the final job.join() is removed.
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/BreakpointManagerContentProvider.java b/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/BreakpointManagerContentProvider.java
index 89e2410..92944bf 100644
--- a/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/BreakpointManagerContentProvider.java
+++ b/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/BreakpointManagerContentProvider.java
@@ -26,15 +26,11 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.function.Consumer;
 
 import org.eclipse.core.resources.IMarkerDelta;
 import org.eclipse.core.runtime.CoreException;
 import org.eclipse.core.runtime.IAdaptable;
-import org.eclipse.core.runtime.IProgressMonitor;
-import org.eclipse.core.runtime.IStatus;
-import org.eclipse.core.runtime.Status;
-import org.eclipse.core.runtime.jobs.ISchedulingRule;
-import org.eclipse.core.runtime.jobs.Job;
 import org.eclipse.debug.core.DebugPlugin;
 import org.eclipse.debug.core.IBreakpointManager;
 import org.eclipse.debug.core.IBreakpointsListener;
@@ -756,23 +752,6 @@
 	}
 
 	/**
-	 * Scheduling rule to make sure that breakpoint manager listener updates
-	 * are process serially.
-	 */
-	private ISchedulingRule fBreakpointsListenerSchedulingRule = new ISchedulingRule() {
-
-		@Override
-		public boolean isConflicting(ISchedulingRule rule) {
-			return rule == this;
-		}
-
-		@Override
-		public boolean contains(ISchedulingRule rule) {
-			return rule == this;
-		}
-	};
-
-	/**
 	 * A map of input to info data cache
 	 */
 	final private Map<DefaultBreakpointsViewInput, InputData> fInputToData = Collections.synchronizedMap(new InputDataMap<>());
@@ -1019,58 +998,25 @@
 		return EMPTY;
 	}
 
+	final SerialExecutor breakpointUpdater = new SerialExecutor("Breakpoints View Update Job", this); //$NON-NLS-1$
+
+	void updateBreakpointView(Consumer<InputData> action) {
+		breakpointUpdater.schedule(() -> fInputToData.values().forEach(action::accept));
+	}
+
 	@Override
 	public void breakpointsAdded(final IBreakpoint[] breakpoints) {
-		new Job("Breakpoints View Update Job") { //$NON-NLS-1$
-			{
-				setSystem(true);
-				setRule(fBreakpointsListenerSchedulingRule);
-			}
-
-			@Override
-			protected IStatus run(IProgressMonitor monitor) {
-				for (InputData data : fInputToData.values()) {
-					data.breakpointsAdded(breakpoints);
-				}
-				return Status.OK_STATUS;
-			}
-		}.schedule();
+		updateBreakpointView(data -> data.breakpointsAdded(breakpoints));
 	}
 
 	@Override
 	public void breakpointsRemoved(final IBreakpoint[] breakpoints, IMarkerDelta[] deltas) {
-		new Job("Breakpoints View Update Job") { //$NON-NLS-1$
-			{
-				setSystem(true);
-				setRule(fBreakpointsListenerSchedulingRule);
-			}
-
-			@Override
-			protected IStatus run(IProgressMonitor monitor) {
-				for (InputData data : fInputToData.values()) {
-					data.breakpointsRemoved(breakpoints);
-				}
-				return Status.OK_STATUS;
-			}
-		}.schedule();
+		updateBreakpointView(data -> data.breakpointsRemoved(breakpoints));
 	}
 
 	@Override
 	public void breakpointsChanged(final IBreakpoint[] breakpoints, IMarkerDelta[] deltas) {
-		new Job("Breakpoints View Update Job") { //$NON-NLS-1$
-			{
-				setSystem(true);
-				setRule(fBreakpointsListenerSchedulingRule);
-			}
-
-			@Override
-			protected IStatus run(IProgressMonitor monitor) {
-				for (InputData data : fInputToData.values()) {
-					data.breakpointsChanged(breakpoints);
-				}
-				return Status.OK_STATUS;
-			}
-		}.schedule();
+		updateBreakpointView(data -> data.breakpointsChanged(breakpoints));
 	}
 	/**
 	 * Appends the model delta flags to child containers that contains the breakpoint.
diff --git a/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/SerialExecutor.java b/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/SerialExecutor.java
new file mode 100644
index 0000000..6e46b2a
--- /dev/null
+++ b/org.eclipse.debug.ui/ui/org/eclipse/debug/internal/ui/model/elements/SerialExecutor.java
@@ -0,0 +1,75 @@
+/*****************************************************************
+ * Copyright (c) 2021 Joerg Kubitz and others
+ *
+ * This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License 2.0
+ * which accompanies this distribution, and is available at
+ * https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ *     Joerg Kubitz - Initial API and implementation
+ *****************************************************************/
+
+package org.eclipse.debug.internal.ui.model.elements;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.eclipse.core.runtime.Assert;
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.eclipse.core.runtime.jobs.Job;
+
+/**
+ * A singleton SerialExecutor job instance can be used to execute Runnable
+ * objects offered by {@link #schedule(Runnable)} method in order of they
+ * submission, one after another.
+ */
+public final class SerialExecutor extends Job {
+
+	private final ConcurrentLinkedQueue<Runnable> queue;
+	private final Object myFamily;
+
+	/**
+	 * @param jobName descriptive job name
+	 * @param family non null object to control this job execution
+	 **/
+	public SerialExecutor(String jobName, Object family) {
+		super(jobName);
+		Assert.isNotNull(family);
+		this.myFamily = family;
+		this.queue = new ConcurrentLinkedQueue<>();
+		setSystem(true);
+	}
+
+	@Override
+	public boolean belongsTo(Object family) {
+		return myFamily == family;
+	}
+
+	@Override
+	protected IStatus run(IProgressMonitor monitor) {
+		Runnable action = queue.poll();
+		try {
+			if (action != null) {
+				action.run();
+			}
+		} finally {
+			if (!queue.isEmpty()) {
+				// in case actions got faster scheduled then processed reschedule:
+				schedule();
+			}
+		}
+		return Status.OK_STATUS;
+	}
+
+	/**
+	 * Enqueue an action asynchronously.
+	 **/
+	public void schedule(Runnable action) {
+		queue.offer(action);
+		schedule(); // will reschedule if already running
+	}
+}
\ No newline at end of file