Bug 528116 - [osgi R7] update the latest R7 OSGi API
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index ef8bb14..cf16c21 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -33,6 +33,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -64,6 +65,7 @@
 import org.osgi.util.function.Predicate;
 import org.osgi.util.promise.Deferred;
 import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.PromiseFactory;
 import org.osgi.util.promise.TimeoutException;
 import org.osgi.util.pushstream.PushEvent.EventType;
 
@@ -77,7 +79,7 @@
 	
 	protected final PushStreamProvider								psp;
 	
-	protected final PushStreamExecutors								executors;
+	protected final PromiseFactory									promiseFactory;
 
 	protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
 	
@@ -91,9 +93,9 @@
 	protected abstract void upstreamClose(PushEvent< ? > close);
 
 	AbstractPushStreamImpl(PushStreamProvider psp,
-			PushStreamExecutors executors) {
+			PromiseFactory promiseFactory) {
 		this.psp = psp;
-		this.executors = executors;
+		this.promiseFactory = promiseFactory;
 	}
 
 	protected long handleEvent(PushEvent< ? extends T> event) {
@@ -216,7 +218,7 @@
 	@Override
 	public PushStream<T> filter(Predicate< ? super T> predicate) {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext((event) -> {
 			try {
 				if (!event.isTerminal()) {
@@ -239,7 +241,7 @@
 	public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
 		
 		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext(event -> {
 			try {
 				if (!event.isTerminal()) {
@@ -257,10 +259,62 @@
 	}
 
 	@Override
+	public <R> PushStream<R> asyncMap(int n, int delay,
+			Function< ? super T,Promise< ? extends R>> mapper)
+			throws IllegalArgumentException, NullPointerException {
+
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, promiseFactory, this);
+		Semaphore s = new Semaphore(n);
+		updateNext(event -> {
+			try {
+				if (event.isTerminal()) {
+					s.acquire(n);
+					eventStream.close(event.nodata());
+					return ABORT;
+				}
+
+				s.acquire(1);
+
+				Promise< ? extends R> p = mapper.apply(event.getData());
+				p.thenAccept(d -> promiseFactory.executor().execute(() -> {
+					try {
+							if (eventStream
+									.handleEvent(PushEvent.data(d)) < 0) {
+								PushEvent<R> close = PushEvent.close();
+								eventStream.close(close);
+								// Upstream close is needed as we have no direct
+								// backpressure
+								upstreamClose(close);
+							}
+					} finally {
+						s.release();
+					}
+				})).onFailure(t -> promiseFactory.executor().execute(() -> {
+					PushEvent<T> error = PushEvent.error(t);
+					close(error);
+					// Upstream close is needed as we have no direct
+					// backpressure
+					upstreamClose(error);
+				}));
+
+				// The number active before was one less than the active number
+				int activePromises = Math.max(0, n - s.availablePermits() - 1);
+				return (activePromises + s.getQueueLength()) * delay;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+
+		return eventStream;
+	}
+
+	@Override
 	public <R> PushStream<R> flatMap(
 			Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
 		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 
 		PushEventConsumer<R> consumer = e -> {
 			switch (e.getType()) {
@@ -320,7 +374,7 @@
 	public PushStream<T> sorted(Comparator< ? super T> comparator) {
 		List<T> list = Collections.synchronizedList(new ArrayList<>());
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext(event -> {
 			try {
 				switch(event.getType()) {
@@ -355,7 +409,7 @@
 			throw new IllegalArgumentException("The limit must be greater than zero");
 		}
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		AtomicLong counter = new AtomicLong(maxSize);
 		updateNext(event -> {
 			try {
@@ -381,11 +435,12 @@
 	@Override
 	public PushStream<T> limit(Duration maxTime) {
 		
-		Runnable start = () -> executors.schedule(() -> close(),
+		Runnable start = () -> promiseFactory.scheduledExecutor().schedule(
+				() -> close(),
 				maxTime.toNanos(), NANOSECONDS);
 
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
-				psp, executors, this) {
+				psp, promiseFactory, this) {
 			@Override
 			protected void beginning() {
 				start.run();
@@ -409,11 +464,12 @@
 		long timeout = maxTime.toNanos();
 
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
-				psp, executors, this) {
+				psp, promiseFactory, this) {
 			@Override
 			protected void beginning() {
 				lastTime.set(System.nanoTime());
-				executors.schedule(() -> check(lastTime, timeout), timeout,
+				promiseFactory.scheduledExecutor().schedule(
+						() -> check(lastTime, timeout), timeout,
 						NANOSECONDS);
 			}
 		};
@@ -434,7 +490,8 @@
 		long elapsed = now - lastTime.get();
 
 		if (elapsed < timeout) {
-			executors.schedule(() -> check(lastTime, timeout),
+			promiseFactory.scheduledExecutor().schedule(
+					() -> check(lastTime, timeout),
 					timeout - elapsed, NANOSECONDS);
 		} else {
 			PushEvent<T> error = PushEvent.error(new TimeoutException());
@@ -451,7 +508,7 @@
 					"The number to skip must be greater than or equal to zero");
 		}
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		AtomicLong counter = new AtomicLong(n);
 		updateNext(event -> {
 			try {
@@ -475,7 +532,8 @@
 	@Override
 	public PushStream<T> fork(int n, int delay, Executor ex) {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+				psp, new PromiseFactory(Objects.requireNonNull(ex),
+						promiseFactory.scheduledExecutor()),
 				this);
 		Semaphore s = new Semaphore(n);
 		updateNext(event -> {
@@ -538,7 +596,7 @@
 	public PushStream<T> merge(
 			PushEventSource< ? extends T> source) {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		AtomicInteger count = new AtomicInteger(2);
 		PushEventConsumer<T> consumer = event -> {
 			try {
@@ -620,7 +678,7 @@
 
 		@SuppressWarnings("resource")
 		AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
-				psp, executors) {
+				psp, promiseFactory) {
 			@Override
 			protected boolean begin() {
 				if (closed.compareAndSet(BUILDING, STARTED)) {
@@ -660,7 +718,7 @@
 		Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
 		AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
 		for(int i = 0; i < tests.length; i++) {
-			rsult[i] = new IntermediatePushStreamImpl<>(psp, executors, this);
+			rsult[i] = new IntermediatePushStreamImpl<>(psp, promiseFactory, this);
 		}
 
 		Boolean[] array = new Boolean[tests.length];
@@ -716,7 +774,7 @@
 	@Override
 	public PushStream<T> sequential() {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		Lock lock = new ReentrantLock();
 		updateNext((event) -> {
 			try {
@@ -738,7 +796,7 @@
 	public <R> PushStream<R> coalesce(
 			Function< ? super T,Optional<R>> accumulator) {
 		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext((event) -> {
 			try {
 				if (!event.isTerminal()) {
@@ -785,7 +843,7 @@
 
 		@SuppressWarnings("resource")
 		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
-				psp, executors, this) {
+				psp, promiseFactory, this) {
 			@Override
 			protected void beginning() {
 				init.run();
@@ -857,7 +915,7 @@
 	@Override
 	public <R> PushStream<R> window(Duration time,
 			Function<Collection<T>,R> f) {
-		return window(time, executors.executor(), f);
+		return window(time, promiseFactory.executor(), f);
 	}
 
 	@Override
@@ -876,7 +934,7 @@
 	public <R> PushStream<R> window(Supplier<Duration> time,
 			IntSupplier maxEvents,
 			BiFunction<Long,Collection<T>,R> f) {
-		return window(time, maxEvents, executors.executor(), f);
+		return window(time, maxEvents, promiseFactory.executor(), f);
 	}
 
 	@Override
@@ -902,7 +960,7 @@
 
 				long windowSize = time.get().toNanos();
 				previousWindowSize.set(windowSize);
-				executors.schedule(
+				promiseFactory.scheduledExecutor().schedule(
 						getWindowTask(p, f, time, maxEvents, lock, count,
 								queueRef, timestamp, counter,
 								previousWindowSize, ex),
@@ -914,7 +972,8 @@
 
 		@SuppressWarnings("resource")
 		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
-				psp, new PushStreamExecutors(ex, executors.scheduledExecutor()),
+				psp, new PromiseFactory(Objects.requireNonNull(ex),
+						promiseFactory.scheduledExecutor()),
 				this) {
 			@Override
 			protected void beginning() {
@@ -968,7 +1027,7 @@
 					long nextWindow = time.get().toNanos();
 					long backpressure = previousWindowSize.getAndSet(nextWindow)
 							- elapsed;
-					executors.schedule(
+					promiseFactory.scheduledExecutor().schedule(
 							getWindowTask(eventStream, f, time, maxEvents, lock,
 									newCount, queueRef, timestamp, counter,
 									previousWindowSize, ex),
@@ -1194,7 +1253,7 @@
 			long nextWindow = time.get().toNanos();
 			previousWindowSize.set(nextWindow);
 			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
-			executors.schedule(
+			promiseFactory.scheduledExecutor().schedule(
 					getWindowTask(eventStream, f, time, maxEvents, lock,
 							expectedCounter + 1, queueRef, timestamp, counter,
 							previousWindowSize, executor),
@@ -1226,7 +1285,7 @@
 	@Override
 	public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment) {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext(event -> {
 			try {
 				long bp = eventStream.handleEvent(event);
@@ -1247,7 +1306,7 @@
 	public PushStream<T> adjustBackPressure(
 			ToLongBiFunction<T,Long> adjustment) {
 		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
-				psp, executors, this);
+				psp, promiseFactory, this);
 		updateNext(event -> {
 			try {
 				long bp = eventStream.handleEvent(event);
@@ -1268,7 +1327,7 @@
 
 	@Override
 	public Promise<Void> forEach(Consumer< ? super T> action) {
-		Deferred<Void> d = executors.deferred();
+		Deferred<Void> d = promiseFactory.deferred();
 		updateNext((event) -> {
 				try {
 					switch(event.getType()) {
@@ -1307,7 +1366,7 @@
 
 	@Override
 	public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
-		Deferred<T> d = executors.deferred();
+		Deferred<T> d = promiseFactory.deferred();
 		AtomicReference<T> iden = new AtomicReference<T>(identity);
 
 		updateNext(event -> {
@@ -1336,7 +1395,7 @@
 
 	@Override
 	public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
-		Deferred<Optional<T>> d = executors.deferred();
+		Deferred<Optional<T>> d = promiseFactory.deferred();
 		AtomicReference<T> iden = new AtomicReference<T>(null);
 
 		updateNext(event -> {
@@ -1366,7 +1425,7 @@
 
 	@Override
 	public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
-		Deferred<U> d = executors.deferred();
+		Deferred<U> d = promiseFactory.deferred();
 		AtomicReference<U> iden = new AtomicReference<>(identity);
 
 		updateNext(event -> {
@@ -1397,7 +1456,7 @@
 	public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
 		A result = collector.supplier().get();
 		BiConsumer<A, ? super T> accumulator = collector.accumulator();
-		Deferred<R> d = executors.deferred();
+		Deferred<R> d = promiseFactory.deferred();
 		PushEventConsumer<T> consumer;
 
 		if (collector.characteristics()
@@ -1464,7 +1523,7 @@
 
 	@Override
 	public Promise<Long> count() {
-		Deferred<Long> d = executors.deferred();
+		Deferred<Long> d = promiseFactory.deferred();
 		LongAdder counter = new LongAdder();
 		updateNext((event) -> {
 				try {
@@ -1510,7 +1569,7 @@
 
 	@Override
 	public Promise<Optional<T>> findFirst() {
-		Deferred<Optional<T>> d = executors.deferred();
+		Deferred<Optional<T>> d = promiseFactory.deferred();
 		updateNext((event) -> {
 				try {
 					Optional<T> o = null;
@@ -1544,7 +1603,7 @@
 
 	@Override
 	public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
-		Deferred<Long> d = executors.deferred();
+		Deferred<Long> d = promiseFactory.deferred();
 		LongAdder la = new LongAdder();
 		updateNext((event) -> {
 			try {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
index 7a8d163..dfb9eee 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -25,6 +25,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+import org.osgi.util.promise.PromiseFactory;
+
 class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
 		extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
 	
@@ -46,11 +48,11 @@
 	private final int					parallelism;
 
 	BufferedPushStreamImpl(PushStreamProvider psp,
-			PushStreamExecutors executors, U eventQueue, int parallelism,
+			PromiseFactory promiseFactory, U eventQueue, int parallelism,
 			QueuePolicy<T,U> queuePolicy,
 			PushbackPolicy<T,U> pushbackPolicy,
 			Function<PushEventConsumer<T>,AutoCloseable> connector) {
-		super(psp, executors, connector);
+		super(psp, promiseFactory, connector);
 		this.eventQueue = eventQueue;
 		this.parallelism = parallelism;
 		this.semaphore = new Semaphore(parallelism);
@@ -85,7 +87,7 @@
 	}
 
 	private void startWorker() {
-		executors.execute(() -> {
+		promiseFactory.executor().execute(() -> {
 			try {
 				PushEvent< ? extends T> event;
 				while ((event = eventQueue.poll()) != null) {
@@ -99,7 +101,8 @@
 						close();
 						return;
 					} else if(backpressure > 0) {
-						executors.schedule(this::startWorker, backpressure,
+						promiseFactory.scheduledExecutor().schedule(
+								this::startWorker, backpressure,
 								MILLISECONDS);
 						return;
 					}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 6787512..c939c4a 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -18,15 +18,17 @@
 
 import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
 
+import org.osgi.util.promise.PromiseFactory;
+
 class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
 		implements PushStream<T> {
 	
 	private final AbstractPushStreamImpl< ? > previous;
 	
 	IntermediatePushStreamImpl(PushStreamProvider psp,
-			PushStreamExecutors executors,
+			PromiseFactory promiseFactory,
 			AbstractPushStreamImpl< ? > previous) {
-		super(psp, executors);
+		super(psp, promiseFactory);
 		this.previous = previous;
 	}
 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 57a3bb9..574d655 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -90,7 +90,7 @@
 	 * @throws IllegalStateException if this event is not an
 	 *             {@link EventType#ERROR} event.
 	 */
-	public Exception getFailure() throws IllegalStateException {
+	public Throwable getFailure() throws IllegalStateException {
 		throw new IllegalStateException(
 				"Not an ERROR event, the event type is " + getType());
 	}
@@ -119,11 +119,11 @@
 	 * Create a new error event.
 	 * 
 	 * @param <T> The payload type.
-	 * @param e The error.
+	 * @param t The error.
 	 * @return A new error event with the specified error.
 	 */
-	public static <T> PushEvent<T> error(Exception e) {
-		return new ErrorEvent<T>(e);
+	public static <T> PushEvent<T> error(Throwable t) {
+		return new ErrorEvent<T>(t);
 	}
 
 	/**
@@ -182,14 +182,14 @@
 	}
 
 	static final class ErrorEvent<T> extends PushEvent<T> {
-		private final Exception error;
+		private final Throwable error;
 
-		ErrorEvent(Exception error) {
+		ErrorEvent(Throwable error) {
 			this.error = error;
 		}
 
 		@Override
-		public Exception getFailure() {
+		public Throwable getFailure() {
 			return error;
 		}
 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index 6d5953e..071c9ec 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -104,6 +104,26 @@
 	<R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
 
 	/**
+	 * Asynchronously map the payload values. The mapping function returns a
+	 * Promise representing the asynchronous mapping operation.
+	 * <p>
+	 * The PushStream limits the number of concurrently running mapping
+	 * operations, and returns back pressure based on the number of existing
+	 * queued operations.
+	 * 
+	 * @param n number of simultaneous promises to use
+	 * @param delay Nr of ms/promise that is queued back pressure
+	 * @param mapper The mapping function
+	 * @return Builder style (can be a new or the same object)
+	 * @throws IllegalArgumentException if the number of threads is &lt; 1 or
+	 *             the delay is &lt; 0
+	 * @throws NullPointerException if the mapper is null
+	 */
+	<R> PushStream<R> asyncMap(int n, int delay,
+			Function< ? super T,Promise< ? extends R>> mapper)
+			throws IllegalArgumentException, NullPointerException;
+
+	/**
 	 * Flat map the payload value (turn one event into 0..n events of
 	 * potentially another type).
 	 * 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
deleted file mode 100644
index 7c2509e..0000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamExecutors.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2017). All Rights Reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.osgi.util.promise.PromiseExecutors;
-
-class PushStreamExecutors extends PromiseExecutors {
-	PushStreamExecutors(Executor executor, ScheduledExecutorService scheduler) {
-		super(requireNonNull(executor), requireNonNull(scheduler));
-	}
-
-	void execute(Runnable operation) {
-		executor().execute(operation);
-	}
-
-	ScheduledFuture< ? > schedule(Runnable operation, long delay,
-			TimeUnit unit) {
-		return scheduledExecutor().schedule(operation, delay, unit);
-	}
-
-	@Override
-	protected Executor executor() {
-		return super.executor();
-	}
-
-	@Override
-	protected ScheduledExecutorService scheduledExecutor() {
-		return super.scheduledExecutor();
-	}
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index c7d861b..ecd8bf4 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -23,6 +23,7 @@
 import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
 
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,8 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Stream;
 
+import org.osgi.util.promise.PromiseFactory;
+
 /**
  * A factory for {@link PushStream} instances, and utility methods for handling
  * {@link PushEventSource}s and {@link PushEventConsumer}s
@@ -164,7 +167,7 @@
 			workerToUse = Executors.newFixedThreadPool(parallelism);
 			closeExecutorOnClose = true;
 		} else {
-			workerToUse = executor;
+			workerToUse = Objects.requireNonNull(executor);
 			closeExecutorOnClose = false;
 		}
 
@@ -174,7 +177,7 @@
 			timerToUse = acquireScheduler();
 			releaseSchedulerOnClose = true;
 		} else {
-			timerToUse = scheduler;
+			timerToUse = Objects.requireNonNull(scheduler);
 			releaseSchedulerOnClose = false;
 		}
 
@@ -191,7 +194,7 @@
 		}
 
 		PushStream<T> stream = new BufferedPushStreamImpl<>(this,
-				new PushStreamExecutors(workerToUse, timerToUse), queue,
+				new PromiseFactory(workerToUse, timerToUse), queue,
 				parallelism, queuePolicy,
 				pushbackPolicy, aec -> {
 					try {
@@ -231,7 +234,7 @@
 			workerToUse = Executors.newFixedThreadPool(2);
 			closeExecutorOnClose = true;
 		} else {
-			workerToUse = executor;
+			workerToUse = Objects.requireNonNull(executor);
 			closeExecutorOnClose = false;
 		}
 
@@ -241,11 +244,11 @@
 			timerToUse = acquireScheduler();
 			releaseSchedulerOnClose = true;
 		} else {
-			timerToUse = scheduler;
+			timerToUse = Objects.requireNonNull(scheduler);
 			releaseSchedulerOnClose = false;
 		}
 		PushStream<T> stream = new UnbufferedPushStreamImpl<>(this,
-				new PushStreamExecutors(workerToUse, timerToUse),
+				new PromiseFactory(workerToUse, timerToUse),
 				aec -> {
 					try {
 						return eventSource.open(aec);
@@ -533,7 +536,7 @@
 			toUse = Executors.newFixedThreadPool(parallelism);
 			closeExecutorOnClose = true;
 		} else {
-			toUse = executor;
+			toUse = Objects.requireNonNull(executor);
 			closeExecutorOnClose = false;
 		}
 
@@ -546,7 +549,7 @@
 		}
 
 		SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
-				new PushStreamExecutors(toUse, acquireScheduler()), queuePolicy,
+				new PromiseFactory(toUse, acquireScheduler()), queuePolicy,
 				queue, parallelism,
 				() -> {
 					try {
@@ -712,7 +715,7 @@
 			workerToUse = Executors.newFixedThreadPool(2);
 			closeExecutorOnClose = true;
 		} else {
-			workerToUse = executor;
+			workerToUse = Objects.requireNonNull(executor);
 			closeExecutorOnClose = false;
 		}
 
@@ -722,12 +725,12 @@
 			timerToUse = acquireScheduler();
 			releaseSchedulerOnClose = true;
 		} else {
-			timerToUse = scheduler;
+			timerToUse = Objects.requireNonNull(scheduler);
 			releaseSchedulerOnClose = false;
 		}
 
 		PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
-				this, new PushStreamExecutors(workerToUse, timerToUse), aec -> {
+				this, new PromiseFactory(workerToUse, timerToUse), aec -> {
 					return () -> { /* No action to take */ };
 				}) {
 
@@ -736,7 +739,7 @@
 				if (super.begin()) {
 					Iterator<T> it = items.iterator();
 
-					executors.execute(() -> pushData(it));
+					promiseFactory.executor().execute(() -> pushData(it));
 
 					return true;
 				}
@@ -753,8 +756,9 @@
 								close();
 								return;
 							} else {
-								executors.schedule(
-										() -> executors
+								promiseFactory.scheduledExecutor()
+										.schedule(
+										() -> promiseFactory.executor()
 												.execute(() -> pushData(it)),
 										returnValue, MILLISECONDS);
 								return;
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
index f47bcb1..314ae08 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -76,9 +76,9 @@
 	 * {@link #open(PushEventConsumer)} this source, and will receive subsequent
 	 * events.
 	 *
-	 * @param e the error
+	 * @param t the error
 	 */
-	void error(Exception e);
+	void error(Throwable t);
 
 	/**
 	 * Determine whether there are any {@link PushEventConsumer}s for this
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
index 094a580..478d0e4 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -28,15 +28,15 @@
 
 import org.osgi.util.promise.Deferred;
 import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.Promises;
+import org.osgi.util.promise.PromiseFactory;
 
 class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
 		implements SimplePushEventSource<T> {
 	
 	private final Object								lock		= new Object();
 
-	private final PushStreamExecutors					executors;
-	private final PushStreamExecutors					sameThread;
+	private final PromiseFactory						promiseFactory;
+	private final PromiseFactory						sameThread;
 
 	private final QueuePolicy<T,U>						queuePolicy;
 
@@ -57,13 +57,13 @@
 	private boolean										waitForFinishes;
 
 
-	public SimplePushEventSourceImpl(PushStreamExecutors executors,
+	public SimplePushEventSourceImpl(PromiseFactory promiseFactory,
 			QueuePolicy<T,U> queuePolicy,
 			U queue, int parallelism, Runnable onClose) {
-		this.executors = executors;
-		this.sameThread = new PushStreamExecutors(
-				PushStreamExecutors.inlineExecutor(),
-				executors.scheduledExecutor());
+		this.promiseFactory = promiseFactory;
+		this.sameThread = new PromiseFactory(
+				PromiseFactory.inlineExecutor(),
+				promiseFactory.scheduledExecutor());
 		this.queuePolicy = queuePolicy;
 		this.queue = queue;
 		this.parallelism = parallelism;
@@ -111,7 +111,7 @@
 
 	private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
 		try {
-			executors.execute(() -> safePush(pec, event));
+			promiseFactory.executor().execute(() -> safePush(pec, event));
 		} catch (RejectedExecutionException ree) {
 			// TODO log?
 			if (!event.isTerminal()) {
@@ -126,7 +126,7 @@
 			PushEventConsumer< ? super T> pec, PushEvent<T> event) {
 		Deferred<Long> d = sameThread.deferred();
 		try {
-			executors.execute(
+			promiseFactory.executor().execute(
 					() -> d.resolve(Long.valueOf(
 							System.nanoTime() + safePush(pec, event))));
 		} catch (RejectedExecutionException ree) {
@@ -206,8 +206,8 @@
 	}
 
 	@Override
-	public void error(Exception e) {
-		enqueueEvent(PushEvent.error(e));
+	public void error(Throwable t) {
+		enqueueEvent(PushEvent.error(t));
 	}
 
 	private void enqueueEvent(PushEvent<T> event) {
@@ -237,7 +237,7 @@
 			"unchecked", "boxing"
 	})
 	private void startWorker() {
-		executors.execute(() -> {
+		promiseFactory.executor().execute(() -> {
 			try {
 				
 				for(;;) {
@@ -287,7 +287,8 @@
 								- System.nanoTime();
 
 						if (toWait > 0) {
-							executors.schedule(this::startWorker, toWait,
+							promiseFactory.scheduledExecutor().schedule(
+									this::startWorker, toWait,
 									NANOSECONDS);
 							return;
 						}
@@ -297,14 +298,15 @@
 							long toWait = p.getValue() - System.nanoTime();
 
 							if (toWait > 0) {
-								executors.schedule(this::startWorker, toWait,
+								promiseFactory.scheduledExecutor().schedule(
+										this::startWorker, toWait,
 										NANOSECONDS);
 							} else {
 								startWorker();
 							}
 							return p;
 						}, p -> close(
-								PushEvent.error((Exception) p.getFailure())));
+								PushEvent.error(p.getFailure())));
 						return;
 					}
 				}
@@ -346,8 +348,7 @@
 					return doCall(event, pec);
 				}
 			}).collect(toList());
-			return Promises
-					.all(sameThread.deferred(), calls)
+			return sameThread.all(calls)
 					.map(l -> l.stream().max(Long::compareTo).orElseGet(
 							() -> Long.valueOf(System.nanoTime())));
 		}
@@ -375,17 +376,17 @@
 
 			if (connected.isEmpty()) {
 				if (connectPromise == null) {
-					connectPromise = executors.deferred();
+					connectPromise = promiseFactory.deferred();
 				}
 				return connectPromise.getPromise();
 			} else {
-				return executors.resolved(null);
+				return promiseFactory.resolved(null);
 			}
 		}
 	}
 
 	private Promise<Void> closedConnectPromise() {
-		return executors.failed(new IllegalStateException(
+		return promiseFactory.failed(new IllegalStateException(
 				"This SimplePushEventSource is closed"));
 	}
 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index 53400c4..eb3e933 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -23,6 +23,8 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
+import org.osgi.util.promise.PromiseFactory;
+
 class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
 	extends AbstractPushStreamImpl<T> implements PushStream<T> {
 	
@@ -31,9 +33,9 @@
 	protected final AtomicReference<AutoCloseable>					upstream	= new AtomicReference<AutoCloseable>();
 	
 	UnbufferedPushStreamImpl(PushStreamProvider psp,
-			PushStreamExecutors executors,
+			PromiseFactory promiseFactory,
 			Function<PushEventConsumer<T>,AutoCloseable> connector) {
-		super(psp, executors);
+		super(psp, promiseFactory);
 		this.connector = connector;
 	}