Add pushstream API and fix import ranges

Change-Id: I2bd80833a975650125ee2bf69900d2f99de4f0ce
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
index 3568239..74f3a73 100644
--- a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
+++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF
@@ -5,9 +5,10 @@
 Bundle-Version: 1.0.0.qualifier
 Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl
 Bundle-RequiredExecutionEnvironment: JavaSE-1.8
-Import-Package: org.osgi.framework;version="1.3.0",
- org.osgi.service.log;version="1.4.0",
- org.osgi.util.promise;version="1.0.0",
- org.osgi.util.pushstream;version="1.0.0",
- org.osgi.util.tracker;version="1.5.0"
+Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)",
+ org.osgi.service.log;version="[1.4.0,2.0.0)",
+ org.osgi.util.promise;version="[1.0.0,2.0.0)",
+ org.osgi.util.tracker;version="[1.5.0,2.0.0)"
 Bundle-ActivationPolicy: lazy
+Export-Package: org.osgi.service.log.stream;version="1.0.0",
+ org.osgi.util.pushstream;version="1.0.0"
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
new file mode 100644
index 0000000..a37e407
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
@@ -0,0 +1,60 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		implements BufferBuilder<R,T,U> {
+
+	protected Executor				worker;
+	protected int					concurrency;
+	protected PushbackPolicy<T,U>	backPressure;
+	protected QueuePolicy<T,U>		bufferingPolicy;
+	protected U						buffer;
+
+	@Override
+	public BufferBuilder<R,T,U> withBuffer(U queue) {
+		this.buffer = queue;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withQueuePolicy(
+			QueuePolicy<T,U> queuePolicy) {
+		this.bufferingPolicy = queuePolicy;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withQueuePolicy(
+			QueuePolicyOption queuePolicyOption) {
+		this.bufferingPolicy = queuePolicyOption.getPolicy();
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy) {
+		this.backPressure = pushbackPolicy;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time) {
+		this.backPressure = pushbackPolicyOption.getPolicy(time);
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withParallelism(int parallelism) {
+		this.concurrency = parallelism;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withExecutor(Executor executor) {
+		this.worker = executor;
+		return this;
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
new file mode 100644
index 0000000..2293c1a
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -0,0 +1,1480 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+import static org.osgi.util.pushstream.PushEventConsumer.*;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.TimeoutException;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+	
+	static enum State {
+		BUILDING, STARTED, CLOSED
+	}
+	
+	protected final PushStreamProvider								psp;
+	
+	protected final Executor										defaultExecutor;
+	protected final ScheduledExecutorService						scheduler;
+
+	protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
+	
+	protected final AtomicReference<PushEventConsumer<T>>			next			= new AtomicReference<>();
+	
+	protected final AtomicReference<Runnable> onCloseCallback = new AtomicReference<>();
+	protected final AtomicReference<Consumer<? super Throwable>> onErrorCallback = new AtomicReference<>();
+
+	protected abstract boolean begin();
+	
+	AbstractPushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler) {
+		this.psp = psp;
+		this.defaultExecutor = executor;
+		this.scheduler = scheduler;
+	}
+
+	protected long handleEvent(PushEvent< ? extends T> event) {
+		if(closed.get() != CLOSED) {
+			try {
+				if(event.isTerminal()) {
+					close(event.nodata());
+					return ABORT;
+				} else {
+					PushEventConsumer<T> consumer = next.get();
+					long val;
+					if(consumer == null) {
+						//TODO log a warning
+						val = CONTINUE;
+					} else {
+						val = consumer.accept(event);
+					}
+					if(val < 0) {
+						close();
+					}
+					return val;
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		}
+		return ABORT;
+	}
+	
+	@Override
+	public void close() {
+		close(PushEvent.close());
+	}
+	
+	protected boolean close(PushEvent<T> event) {
+		if(!event.isTerminal()) {
+			throw new IllegalArgumentException("The event " + event  + " is not a close event.");
+		}
+		if(closed.getAndSet(CLOSED) != CLOSED) {
+			PushEventConsumer<T> aec = next.getAndSet(null);
+			if(aec != null) {
+				try {
+					aec.accept(event);
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			Runnable handler = onCloseCallback.getAndSet(null);
+			if(handler != null) {
+				try {
+					handler.run();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			if (event.getType() == EventType.ERROR) {
+				Consumer<? super Throwable> errorHandler = onErrorCallback.getAndSet(null);
+				if(errorHandler != null) {
+					try {
+						errorHandler.accept(event.getFailure());
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+	
+	@Override
+	public PushStream<T> onClose(Runnable closeHandler) {
+		if(onCloseCallback.compareAndSet(null, closeHandler)) {
+			if(closed.get() == State.CLOSED && onCloseCallback.compareAndSet(closeHandler, null)) {
+				closeHandler.run();
+			}
+		} else {
+			throw new IllegalStateException("A close handler has already been defined for this stream object");
+		}
+		return this;
+	}
+
+	@Override
+	public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) {
+		if(onErrorCallback.compareAndSet(null, closeHandler)) {
+			if(closed.get() == State.CLOSED) { 
+				//TODO log already closed
+				onErrorCallback.set(null);
+			}
+		} else {
+			throw new IllegalStateException("A close handler has already been defined for this stream object");
+		}
+		return this;
+	}
+
+	private void updateNext(PushEventConsumer<T> consumer) {
+		if(!next.compareAndSet(null, consumer)) {
+			throw new IllegalStateException("This stream has already been chained");
+		} else if(closed.get() == CLOSED && next.compareAndSet(consumer, null)) {
+			try {
+				consumer.accept(PushEvent.close());
+			} catch (Exception e) {
+				//TODO log
+				e.printStackTrace();
+			}
+		}
+	}
+
+	@Override
+	public PushStream<T> filter(Predicate< ? super T> predicate) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext((event) -> {
+			try {
+				if (!event.isTerminal()) {
+					if (predicate.test(event.getData())) {
+						return eventStream.handleEvent(event);
+					} else {
+						return CONTINUE;
+					}
+				}
+				return eventStream.handleEvent(event);
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
+		
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					return eventStream.handleEvent(
+							PushEvent.data(mapper.apply(event.getData())));
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> flatMap(
+			Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+
+		PushEventConsumer<R> consumer = e -> {
+			switch (e.getType()) {
+				case ERROR :
+					close(e.nodata());
+					return ABORT;
+				case CLOSE :
+					// Close should allow the next flat mapped entry
+					// without closing the stream;
+					return ABORT;
+				case DATA :
+					long returnValue = eventStream.handleEvent(e);
+					if (returnValue < 0) {
+						close();
+						return ABORT;
+					}
+					return returnValue;
+				default :
+					throw new IllegalArgumentException(
+							"The event type " + e.getType() + " is unknown");
+			}
+		};
+
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					PushStream< ? extends R> mappedStream = mapper
+							.apply(event.getData());
+
+					return mappedStream.forEachEvent(consumer)
+							.getValue()
+							.longValue();
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> distinct() {
+		Set<T> set = Collections.<T>newSetFromMap(new ConcurrentHashMap<>());
+		return filter(set::add);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public PushStream<T> sorted() {
+		return sorted((Comparator)Comparator.naturalOrder());
+	}
+
+	@Override
+	public PushStream<T> sorted(Comparator< ? super T> comparator) {
+		List<T> list = Collections.synchronizedList(new ArrayList<>());
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA : 
+						list.add(event.getData());
+						return CONTINUE;
+					case CLOSE :
+						list.sort(comparator);
+						sorted: for (T t : list) {
+							if (eventStream
+									.handleEvent(PushEvent.data(t)) < 0) {
+								break sorted;
+							}
+						}
+						// Fall through
+					case ERROR :
+						eventStream.handleEvent(event);
+						return ABORT;
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> limit(long maxSize) {
+		if(maxSize <= 0) {
+			throw new IllegalArgumentException("The limit must be greater than zero");
+		}
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicLong counter = new AtomicLong(maxSize);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					long count = counter.decrementAndGet();
+					if (count > 0) {
+						return eventStream.handleEvent(event);
+					} else if (count == 0) {
+						eventStream.handleEvent(event);
+					}
+					return ABORT;
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+	
+	@Override
+	public PushStream<T> limit(Duration maxTime) {
+		
+		Runnable start = () -> scheduler.schedule(() -> close(),
+				maxTime.toNanos(), NANOSECONDS);
+
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
+				psp, defaultExecutor, scheduler, this) {
+			@Override
+			protected void beginning() {
+				start.run();
+			}
+		};
+		updateNext((event) -> {
+			try {
+				return eventStream.handleEvent(event);
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> timeout(Duration maxTime) {
+
+		AtomicLong lastTime = new AtomicLong();
+		long timeout = maxTime.toNanos();
+
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
+				psp, defaultExecutor, scheduler, this) {
+			@Override
+			protected void beginning() {
+				lastTime.set(System.nanoTime());
+				scheduler.schedule(() -> check(lastTime, timeout), timeout,
+						NANOSECONDS);
+			}
+		};
+		updateNext((event) -> {
+			try {
+				return eventStream.handleEvent(event);
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	void check(AtomicLong lastTime, long timeout) {
+		long now = System.nanoTime();
+
+		long elapsed = now - lastTime.get();
+
+		if (elapsed < timeout) {
+			scheduler.schedule(() -> check(lastTime, timeout),
+					timeout - elapsed, NANOSECONDS);
+		} else {
+			close(PushEvent.error(new TimeoutException()));
+		}
+	}
+
+	@Override
+	public PushStream<T> skip(long n) {
+		if (n < 0) {
+			throw new IllegalArgumentException(
+					"The number to skip must be greater than or equal to zero");
+		}
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicLong counter = new AtomicLong(n);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					if (counter.get() > 0 && counter.decrementAndGet() >= 0) {
+						return CONTINUE;
+					} else {
+						return eventStream.handleEvent(event);
+					} 				
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> fork(int n, int delay, Executor ex) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, ex, scheduler, this);
+		Semaphore s = new Semaphore(n);
+		updateNext(event -> {
+			try {
+				if (event.isTerminal()) {
+					s.acquire(n);
+					eventStream.close(event.nodata());
+					return ABORT;
+				}
+	
+				s.acquire(1);
+	
+				ex.execute(() -> {
+					try {
+						if (eventStream.handleEvent(event) < 0) {
+							eventStream.close(PushEvent.close());
+						}
+					} catch (Exception e1) {
+						close(PushEvent.error(e1));
+					} finally {
+						s.release(1);
+					}
+				});
+	
+				return s.getQueueLength() * delay;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+
+		return eventStream;
+	}
+	
+	@Override
+	public PushStream<T> buffer() {
+		return psp.createStream(c -> {
+			forEachEvent(c);
+			return this;
+		});
+	}
+
+	@Override
+	public <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer() {
+		return psp.buildStream(c -> {
+			forEachEvent(c);
+			return this;
+		});
+	}
+
+	@Override
+	public PushStream<T> merge(
+			PushEventSource< ? extends T> source) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicInteger count = new AtomicInteger(2);
+		PushEventConsumer<T> consumer = event -> {
+			try {
+				if (!event.isTerminal()) {
+					return eventStream.handleEvent(event);
+				}
+	
+				if (count.decrementAndGet() == 0) {
+					eventStream.handleEvent(event.nodata());
+					return ABORT;
+				}
+				return CONTINUE;
+			} catch (Exception e) {
+				PushEvent<T> error = PushEvent.error(e);
+				close(error);
+				eventStream.close(event.nodata());
+				return ABORT;
+			}
+		};
+		updateNext(consumer);
+		AutoCloseable second;
+		try {
+			second = source.open((PushEvent< ? extends T> event) -> {
+				return consumer.accept(event);
+			});
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			throw new IllegalStateException(
+					"Unable to merge events as the event source could not be opened.",
+					e);
+		}
+		
+		return eventStream.onClose(() -> {
+			try {
+				second.close();
+			} catch (Exception e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} 
+		}).map(Function.identity());
+	}
+
+	@Override
+	public PushStream<T> merge(PushStream< ? extends T> source) {
+
+		AtomicInteger count = new AtomicInteger(2);
+		Consumer<AbstractPushStreamImpl<T>> start = downstream -> {
+			PushEventConsumer<T> consumer = e -> {
+				long toReturn;
+				try {
+					if (!e.isTerminal()) {
+						toReturn = downstream.handleEvent(e);
+					} else if (count.decrementAndGet() == 0) {
+						downstream.handleEvent(e);
+						toReturn = ABORT;
+					} else {
+						return ABORT;
+					}
+				} catch (Exception ex) {
+					try {
+						downstream.handleEvent(PushEvent.error(ex));
+					} catch (Exception ex2) { /* Just ignore this */}
+					toReturn = ABORT;
+				}
+				if (toReturn < 0) {
+					try {
+						close();
+					} catch (Exception ex2) { /* Just ignore this */}
+					try {
+						source.close();
+					} catch (Exception ex2) { /* Just ignore this */}
+				}
+				return toReturn;
+			};
+			forEachEvent(consumer);
+			source.forEachEvent(consumer);
+		};
+
+		@SuppressWarnings("resource")
+		AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
+				psp, defaultExecutor, scheduler) {
+			@Override
+			protected boolean begin() {
+				if (closed.compareAndSet(BUILDING, STARTED)) {
+					start.accept(this);
+					return true;
+				}
+				return false;
+			}
+		};
+		
+
+		return eventStream.onClose(() -> {
+			try {
+				close();
+			} catch (Exception e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			try {
+				source.close();
+			} catch (Exception e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} 
+		}).map(Function.identity());
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public PushStream<T>[] split(Predicate< ? super T>... predicates) {
+		Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
+		AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
+		for(int i = 0; i < tests.length; i++) {
+			rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor,
+					scheduler, this);
+		}
+
+		Boolean[] array = new Boolean[tests.length];
+		Arrays.fill(array, Boolean.TRUE);
+		AtomicReferenceArray<Boolean> off = new AtomicReferenceArray<>(array);
+
+		AtomicInteger count = new AtomicInteger(tests.length);
+		updateNext(event -> {
+			if (!event.isTerminal()) {
+				long delay = CONTINUE;
+				for (int i = 0; i < tests.length; i++) {
+					try {
+						if (off.get(i).booleanValue()
+								&& tests[i].test(event.getData())) {
+							long accept = rsult[i].handleEvent(event);
+							if (accept < 0) {
+								off.set(i, Boolean.TRUE);
+								count.decrementAndGet();
+							} else if (accept > delay) {
+								accept = delay;
+							}
+						}
+					} catch (Exception e) {
+						try {
+							rsult[i].close(PushEvent.error(e));
+						} catch (Exception e2) {
+							//TODO log
+						}
+						off.set(i, Boolean.TRUE);
+					}
+				}
+				if (count.get() == 0)
+					return ABORT;
+
+				return delay;
+			}
+			for (AbstractPushStreamImpl<T> as : rsult) {
+				try {
+					as.handleEvent(event.nodata());
+				} catch (Exception e) {
+					try {
+						as.close(PushEvent.error(e));
+					} catch (Exception e2) {
+						//TODO log
+					}
+				}
+			}
+			return ABORT;
+		});
+		return Arrays.copyOf(rsult, tests.length);
+	}
+
+	@Override
+	public PushStream<T> sequential() {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		Lock lock = new ReentrantLock();
+		updateNext((event) -> {
+			try {
+				lock.lock();
+				try {
+					return eventStream.handleEvent(event);
+				} finally {
+					lock.unlock();
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(
+			Function< ? super T,Optional<R>> accumulator) {
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext((event) -> {
+			try {
+				if (!event.isTerminal()) {
+					Optional<PushEvent<R>> coalesced = accumulator
+							.apply(event.getData()).map(PushEvent::data);
+					if (coalesced.isPresent()) {
+						try {
+							return eventStream.handleEvent(coalesced.get());
+						} catch (Exception ex) {
+							close(PushEvent.error(ex));
+							return ABORT;
+						}
+					} else {
+						return CONTINUE;
+					}
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f) {
+		if (count <= 0)
+			throw new IllegalArgumentException(
+					"A coalesce operation must collect a positive number of events");
+		// This could be optimised to only use a single collection queue.
+		// It would save some GC, but is it worth it?
+		return coalesce(() -> count, f);
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(IntSupplier count,
+			Function<Collection<T>,R> f) {
+		AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+				null);
+
+		Runnable init = () -> queueRef
+				.set(getQueueForInternalBuffering(count.getAsInt()));
+
+		@SuppressWarnings("resource")
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+				psp, defaultExecutor, scheduler, this) {
+			@Override
+			protected void beginning() {
+				init.run();
+			}
+		};
+
+		AtomicBoolean endPending = new AtomicBoolean();
+		Object lock = new Object();
+		updateNext((event) -> {
+			try {
+				Queue<T> queue;
+				if (!event.isTerminal()) {
+					synchronized (lock) {
+						for (;;) {
+							queue = queueRef.get();
+							if (queue == null) {
+								if (endPending.get()) {
+									return ABORT;
+								} else {
+									continue;
+								}
+							} else if (queue.offer(event.getData())) {
+								return CONTINUE;
+							} else {
+								queueRef.lazySet(null);
+								break;
+							}
+						}
+					}
+
+					queueRef.set(
+							getQueueForInternalBuffering(count.getAsInt()));
+
+					// This call is on the same thread and so must happen
+					// outside
+					// the synchronized block.
+					return aggregateAndForward(f, eventStream, event,
+							queue);
+				} else {
+					synchronized (lock) {
+						queue = queueRef.get();
+						queueRef.lazySet(null);
+						endPending.set(true);
+					}
+					if (queue != null) {
+						eventStream.handleEvent(
+								PushEvent.data(f.apply(queue)));
+					}
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	private <R> long aggregateAndForward(Function<Collection<T>,R> f,
+			AbstractPushStreamImpl<R> eventStream,
+			PushEvent< ? extends T> event, Queue<T> queue) {
+		if (!queue.offer(event.getData())) {
+			((ArrayQueue<T>) queue).forcePush(event.getData());
+		}
+		return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
+	}
+	
+	
+	@Override
+	public <R> PushStream<R> window(Duration time,
+			Function<Collection<T>,R> f) {
+		return window(time, defaultExecutor, f);
+	}
+
+	@Override
+	public <R> PushStream<R> window(Duration time, Executor executor,
+			Function<Collection<T>,R> f) {
+		return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+	}
+
+	@Override
+	public <R> PushStream<R> window(Supplier<Duration> time,
+			IntSupplier maxEvents,
+			BiFunction<Long,Collection<T>,R> f) {
+		return window(time, maxEvents, defaultExecutor, f);
+	}
+
+	@Override
+	public <R> PushStream<R> window(Supplier<Duration> time,
+			IntSupplier maxEvents, Executor ex,
+			BiFunction<Long,Collection<T>,R> f) {
+
+		AtomicLong timestamp = new AtomicLong();
+		AtomicLong previousWindowSize = new AtomicLong();
+		AtomicLong counter = new AtomicLong();
+		Object lock = new Object();
+		AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+				null);
+
+		// This code is declared as a separate block to avoid any confusion
+		// about which instance's methods and variables are in scope
+		Consumer<AbstractPushStreamImpl<R>> begin = p -> {
+
+			synchronized (lock) {
+				timestamp.lazySet(System.nanoTime());
+				long count = counter.get();
+
+
+				long windowSize = time.get().toNanos();
+				previousWindowSize.set(windowSize);
+				scheduler.schedule(
+						getWindowTask(p, f, time, maxEvents, lock, count,
+								queueRef, timestamp, counter,
+								previousWindowSize, ex),
+						windowSize, NANOSECONDS);
+			}
+
+			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+		};
+
+		@SuppressWarnings("resource")
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+				psp, ex, scheduler, this) {
+			@Override
+			protected void beginning() {
+				begin.accept(this);
+			}
+		};
+
+		AtomicBoolean endPending = new AtomicBoolean(false);
+		updateNext((event) -> {
+			try {
+				if (eventStream.closed.get() == CLOSED) {
+					return ABORT;
+				}
+				Queue<T> queue;
+				if (!event.isTerminal()) {
+					long elapsed;
+					long newCount;
+					synchronized (lock) {
+						for (;;) {
+							queue = queueRef.get();
+							if (queue == null) {
+								if (endPending.get()) {
+									return ABORT;
+								} else {
+									continue;
+								}
+							} else if (queue.offer(event.getData())) {
+								return CONTINUE;
+							} else {
+								queueRef.lazySet(null);
+								break;
+							}
+						}
+
+						long now = System.nanoTime();
+						elapsed = now - timestamp.get();
+						timestamp.lazySet(now);
+						newCount = counter.get() + 1;
+						counter.lazySet(newCount);
+
+						// This is a non-blocking call, and must happen in the
+						// synchronized block to avoid re=ordering the executor
+						// enqueue with a subsequent incoming close operation
+						aggregateAndForward(f, eventStream, event, queue,
+								ex, elapsed);
+					}
+					// These must happen outside the synchronized block as we
+					// call out to user code
+					queueRef.set(
+							getQueueForInternalBuffering(maxEvents.getAsInt()));
+					long nextWindow = time.get().toNanos();
+					long backpressure = previousWindowSize.getAndSet(nextWindow)
+							- elapsed;
+					scheduler.schedule(
+							getWindowTask(eventStream, f, time, maxEvents, lock,
+									newCount, queueRef, timestamp, counter,
+									previousWindowSize, ex),
+							nextWindow, NANOSECONDS);
+
+					return backpressure < 0 ? CONTINUE
+							: NANOSECONDS.toMillis(backpressure);
+				} else {
+					long elapsed;
+					synchronized (lock) {
+						queue = queueRef.get();
+						queueRef.lazySet(null);
+						endPending.set(true);
+						long now = System.nanoTime();
+						elapsed = now - timestamp.get();
+						counter.lazySet(counter.get() + 1);
+					}
+					Collection<T> collected = queue == null ? emptyList()
+							: queue;
+					ex.execute(() -> {
+						try {
+							eventStream
+									.handleEvent(PushEvent.data(f.apply(
+											Long.valueOf(NANOSECONDS
+													.toMillis(elapsed)),
+											collected)));
+						} catch (Exception e) {
+							close(PushEvent.error(e));
+						}
+					});
+				}
+				ex.execute(() -> eventStream.handleEvent(event.nodata()));
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	protected Queue<T> getQueueForInternalBuffering(int size) {
+		if (size == 0) {
+			return new LinkedList<T>();
+		} else {
+			return new ArrayQueue<>(size - 1);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	/**
+	 * A special queue that keeps one element in reserve and can have that last
+	 * element set using forcePush. After the element is set the capacity is
+	 * permanently increased by one and cannot grow further.
+	 * 
+	 * @param <E> The element type
+	 */
+	private static class ArrayQueue<E> extends AbstractQueue<E>
+			implements Queue<E> {
+
+		final Object[]	store;
+
+		int				normalLength;
+
+		int				nextIndex;
+
+		int				size;
+
+		ArrayQueue(int capacity) {
+			store = new Object[capacity + 1];
+			normalLength = store.length - 1;
+		}
+
+		@Override
+		public boolean offer(E e) {
+			if (e == null)
+				throw new NullPointerException("Null values are not supported");
+			if (size < normalLength) {
+				store[nextIndex] = e;
+				size++;
+				nextIndex++;
+				nextIndex = nextIndex % normalLength;
+				return true;
+			}
+			return false;
+		}
+
+		public void forcePush(E e) {
+			store[normalLength] = e;
+			normalLength++;
+			size++;
+		}
+
+		@Override
+		public E poll() {
+			if (size == 0) {
+				return null;
+			} else {
+				int idx = nextIndex - size;
+				if (idx < 0) {
+					idx += normalLength;
+				}
+				E value = (E) store[idx];
+				store[idx] = null;
+				size--;
+				return value;
+			}
+		}
+
+		@Override
+		public E peek() {
+			if (size == 0) {
+				return null;
+			} else {
+				int idx = nextIndex - size;
+				if (idx < 0) {
+					idx += normalLength;
+				}
+				return (E) store[idx];
+			}
+		}
+
+		@Override
+		public Iterator<E> iterator() {
+			final int previousNext = nextIndex;
+			return new Iterator<E>() {
+
+				int idx;
+
+				int	remaining	= size;
+
+				{
+					idx = nextIndex - size;
+					if (idx < 0) {
+						idx += normalLength;
+					}
+				}
+
+				@Override
+				public boolean hasNext() {
+					if (nextIndex != previousNext) {
+						throw new ConcurrentModificationException(
+								"The queue was concurrently modified");
+					}
+					return remaining > 0;
+				}
+
+				@Override
+				public E next() {
+					if (!hasNext()) {
+						throw new NoSuchElementException(
+								"The iterator has no more values");
+					}
+					E value = (E) store[idx];
+					idx++;
+					remaining--;
+					if (idx == normalLength) {
+						idx = 0;
+					}
+					return value;
+				}
+
+			};
+		}
+
+		@Override
+		public int size() {
+			return size;
+		}
+
+	}
+
+	private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream,
+			BiFunction<Long,Collection<T>,R> f, Supplier<Duration> time,
+			IntSupplier maxEvents, Object lock, long expectedCounter,
+			AtomicReference<Queue<T>> queueRef, AtomicLong timestamp,
+			AtomicLong counter, AtomicLong previousWindowSize,
+			Executor executor) {
+		return () -> {
+
+			Queue<T> queue = null;
+			long elapsed;
+			synchronized (lock) {
+				
+				if (counter.get() != expectedCounter) {
+					return;
+				}
+				counter.lazySet(expectedCounter + 1);
+
+				long now = System.nanoTime();
+				elapsed = now - timestamp.get();
+				timestamp.lazySet(now);
+
+				queue = queueRef.get();
+				queueRef.lazySet(null);
+
+				// This is a non-blocking call, and must happen in the
+				// synchronized block to avoid re=ordering the executor
+				// enqueue with a subsequent incoming close operation
+
+				Collection<T> collected = queue == null ? emptyList() : queue;
+				executor.execute(() -> {
+					try {
+						eventStream.handleEvent(PushEvent.data(f.apply(
+								Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+								collected)));
+					} catch (Exception e) {
+						close(PushEvent.error(e));
+					}
+				});
+			}
+
+			// These must happen outside the synchronized block as we
+			// call out to user code
+			long nextWindow = time.get().toNanos();
+			previousWindowSize.set(nextWindow);
+			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+			scheduler.schedule(
+					getWindowTask(eventStream, f, time, maxEvents, lock,
+							expectedCounter + 1, queueRef, timestamp, counter,
+							previousWindowSize, executor),
+					nextWindow, NANOSECONDS);
+		};
+	}
+
+	private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
+			AbstractPushStreamImpl<R> eventStream,
+			PushEvent< ? extends T> event, Queue<T> queue, Executor executor,
+			long elapsed) {
+		executor.execute(() -> {
+			try {
+				if (!queue.offer(event.getData())) {
+					((ArrayQueue<T>) queue).forcePush(event.getData());
+				}
+				long result = eventStream.handleEvent(PushEvent.data(
+						f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+								queue)));
+				if (result < 0) {
+					close();
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+		});
+	}
+
+	@Override
+	public Promise<Void> forEach(Consumer< ? super T> action) {
+		Deferred<Void> d = new Deferred<>();
+		updateNext((event) -> {
+				try {
+					switch(event.getType()) {
+						case DATA:
+							action.accept(event.getData());
+							return CONTINUE;
+						case CLOSE:
+							d.resolve(null);
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+					d.fail(e);
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Object[]> toArray() {
+		return collect(Collectors.toList())
+				.map(List::toArray);
+	}
+
+	@Override
+	public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
+		return collect(Collectors.toList())
+				.map(l -> l.toArray(generator.apply(l.size())));
+	}
+
+	@Override
+	public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
+		Deferred<T> d = new Deferred<>();
+		AtomicReference<T> iden = new AtomicReference<T>(identity);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						iden.accumulateAndGet(event.getData(), accumulator);
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(iden.get());
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
+		Deferred<Optional<T>> d = new Deferred<>();
+		AtomicReference<T> iden = new AtomicReference<T>(null);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						if (!iden.compareAndSet(null, event.getData()))
+							iden.accumulateAndGet(event.getData(), accumulator);
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(Optional.ofNullable(iden.get()));
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+		Deferred<U> d = new Deferred<>();
+		AtomicReference<U> iden = new AtomicReference<>(identity);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						iden.updateAndGet((e) -> accumulator.apply(e, event.getData()));
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(iden.get());
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
+		A result = collector.supplier().get();
+		BiConsumer<A, ? super T> accumulator = collector.accumulator();
+		Deferred<R> d = new Deferred<>();
+		PushEventConsumer<T> consumer;
+
+		if (collector.characteristics()
+				.contains(Collector.Characteristics.CONCURRENT)) {
+			consumer = event -> {
+				try {
+					switch (event.getType()) {
+						case DATA :
+							accumulator.accept(result, event.getData());
+							return CONTINUE;
+						case CLOSE :
+							d.resolve(collector.finisher().apply(result));
+							break;
+						case ERROR :
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+					return ABORT;
+				}
+			};
+		} else {
+			consumer = event -> {
+				try {
+					switch (event.getType()) {
+						case DATA :
+							synchronized (result) {
+								accumulator.accept(result, event.getData());
+							}
+							return CONTINUE;
+						case CLOSE :
+							d.resolve(collector.finisher().apply(result));
+							break;
+						case ERROR :
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+					return ABORT;
+				}
+			};
+		}
+
+		updateNext(consumer);
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> min(Comparator<? super T> comparator)  {
+		return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
+	}
+
+	@Override
+	public Promise<Optional<T>> max(Comparator<? super T> comparator) {
+		return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
+	}
+
+	@Override
+	public Promise<Long> count() {
+		Deferred<Long> d = new Deferred<>();
+		LongAdder counter = new LongAdder();
+		updateNext((event) -> {
+				try {
+					switch(event.getType()) {
+						case DATA:
+						counter.add(1);
+							return CONTINUE;
+						case CLOSE:
+						d.resolve(Long.valueOf(counter.sum()));
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+				close(PushEvent.error(e));
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
+		return filter(predicate).findAny()
+			.map(Optional::isPresent);
+	}
+
+	@Override
+	public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
+		return filter(x -> !predicate.test(x)).findAny()
+				.map(o -> Boolean.valueOf(!o.isPresent()));
+	}
+
+	@Override
+	public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
+		return filter(predicate).findAny()
+				.map(o -> Boolean.valueOf(!o.isPresent()));
+	}
+
+	@Override
+	public Promise<Optional<T>> findFirst() {
+		Deferred<Optional<T>> d = new Deferred<>();
+		updateNext((event) -> {
+				try {
+					Optional<T> o = null;
+					switch(event.getType()) {
+						case DATA:
+							o = Optional.of(event.getData());
+							break;
+						case CLOSE:
+							o = Optional.empty();
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							return ABORT;
+					}
+					if(!d.getPromise().isDone())
+						d.resolve(o);
+					return ABORT;
+				} catch (Exception e) {
+				close(PushEvent.error(e));
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> findAny() {
+		return findFirst();
+	}
+
+	@Override
+	public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
+		Deferred<Long> d = new Deferred<>();
+		LongAdder la = new LongAdder();
+		updateNext((event) -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						long value = action.accept(event);
+						la.add(value);
+						return value;
+					case CLOSE:
+						try {
+							action.accept(event);
+						} finally {
+							d.resolve(Long.valueOf(la.sum()));
+						}
+						break;
+					case ERROR:
+						try {
+							action.accept(event);
+						} finally {
+							d.fail(event.getFailure());
+						}
+						break;
+				}
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java
new file mode 100644
index 0000000..2aa6ec7
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferBuilder.java
@@ -0,0 +1,79 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a buffered section of a Push-based stream
+ *
+ * @param <R> The type of object being built
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
+
+	/**
+	 * The BlockingQueue implementation to use as a buffer
+	 * 
+	 * @param queue
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withBuffer(U queue);
+
+	/**
+	 * Set the {@link QueuePolicy} of this Builder
+	 * 
+	 * @param queuePolicy
+	 * @return this builder
+	 */
+	BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+	/**
+	 * Set the {@link QueuePolicy} of this Builder
+	 * 
+	 * @param queuePolicyOption
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+	/**
+	 * Set the {@link PushbackPolicy} of this builder
+	 * 
+	 * @param pushbackPolicy
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy);
+
+	/**
+	 * Set the {@link PushbackPolicy} of this builder
+	 * 
+	 * @param pushbackPolicyOption
+	 * @param time
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time);
+
+	/**
+	 * Set the maximum permitted number of concurrent event deliveries allowed
+	 * from this buffer
+	 * 
+	 * @param parallelism
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withParallelism(int parallelism);
+
+	/**
+	 * Set the {@link Executor} that should be used to deliver events from this
+	 * buffer
+	 * 
+	 * @param executor
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withExecutor(Executor executor);
+	
+	/**
+	 * @return the object being built
+	 */
+	R create();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
new file mode 100644
index 0000000..7cedafb
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
@@ -0,0 +1,111 @@
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
+	
+	private final U eventQueue;
+	
+	private final Semaphore semaphore;
+	
+	private final Executor worker;
+	
+	private final QueuePolicy<T, U> queuePolicy;
+
+	private final PushbackPolicy<T, U> pushbackPolicy;
+	
+	/**
+	 * Indicates that a terminal event has been received, that we should stop
+	 * collecting new events, and that we must drain the buffer before
+	 * continuing
+	 */
+	private final AtomicBoolean			softClose	= new AtomicBoolean();
+
+	private final int					parallelism;
+
+	BufferedPushStreamImpl(PushStreamProvider psp,
+			ScheduledExecutorService scheduler, U eventQueue,
+			int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
+			PushbackPolicy<T,U> pushbackPolicy,
+			Function<PushEventConsumer<T>,AutoCloseable> connector) {
+		super(psp, worker, scheduler, connector);
+		this.eventQueue = eventQueue;
+		this.parallelism = parallelism;
+		this.semaphore = new Semaphore(parallelism);
+		this.worker = worker;
+		this.queuePolicy = queuePolicy;
+		this.pushbackPolicy = pushbackPolicy;
+	}
+
+	@Override
+	protected long handleEvent(PushEvent< ? extends T> event) {
+
+		// If we have already been soft closed, or hard closed then abort
+		if (!softClose.compareAndSet(false, event.isTerminal())
+				|| closed.get() == CLOSED) {
+			return ABORT;
+		}
+
+		try {
+			queuePolicy.doOffer(eventQueue, event);
+			long backPressure = pushbackPolicy.pushback(eventQueue);
+			if(backPressure < 0) {
+				close();
+				return ABORT;
+			}
+			if(semaphore.tryAcquire()) {
+				startWorker();
+			}
+			return backPressure;
+		} catch (Exception e) {
+			close(PushEvent.error(e));
+			return ABORT;
+		}
+	}
+
+	private void startWorker() {
+		worker.execute(() -> {
+			try {
+				PushEvent< ? extends T> event;
+				while ((event = eventQueue.poll()) != null) {
+					if (event.isTerminal()) {
+						// Wait for the other threads to finish
+						semaphore.acquire(parallelism - 1);
+					}
+
+					long backpressure = super.handleEvent(event);
+					if(backpressure < 0) {
+						close();
+						return;
+					} else if(backpressure > 0) {
+						scheduler.schedule(this::startWorker, backpressure,
+								MILLISECONDS);
+						return;
+					}
+				}
+
+				semaphore.release();
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+			if(eventQueue.peek() != null && semaphore.tryAcquire()) {
+				try {
+					startWorker();
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+				}
+			}
+		});
+		
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
new file mode 100644
index 0000000..3a4da2f
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -0,0 +1,35 @@
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
+		implements PushStream<T> {
+	
+	private final AbstractPushStreamImpl< ? > previous;
+	
+	IntermediatePushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler,
+			AbstractPushStreamImpl< ? > previous) {
+		super(psp, executor, scheduler);
+		this.previous = previous;
+	}
+
+	@Override
+	protected boolean begin() {
+		if(closed.compareAndSet(BUILDING, STARTED)) {
+			beginning();
+			previous.begin();
+			return true;
+		}
+		return false;
+	}
+
+	protected void beginning() {
+		// The base implementation has nothing to do, but
+		// this method is used in windowing
+	}
+	
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java
new file mode 100644
index 0000000..028f0a3
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEvent.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.PushEvent.EventType.*;
+
+/**
+ * A PushEvent is an immutable object that is transferred through a
+ * communication channel to push information to a downstream consumer. The event
+ * has three different types:
+ * <ul>
+ * <li>{@link EventType#DATA} – Provides access to a typed data element in the
+ * stream.
+ * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
+ * event, no more events will follow.
+ * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
+ * and is sending the reason downstream. The stream is closed and no more events
+ * will follow after this event.
+ * </ul>
+ *
+ * @param <T> The payload type of the event.
+ * @Immutable
+ */
+public abstract class PushEvent<T> {
+
+	/**
+	 * The type of a {@link PushEvent}.
+	 */
+	public static enum EventType {
+		/**
+		 * A data event forming part of the stream
+		 */
+		DATA,
+		/**
+		 * An error event that indicates streaming has failed and that no more
+		 * events will arrive
+		 */
+		ERROR,
+		/**
+		 * An event that indicates that the stream has terminated normally
+		 */
+		CLOSE
+	}
+
+	/**
+	 * Package private default constructor.
+	 */
+	PushEvent() {}
+
+	/**
+	 * Get the type of this event.
+	 * 
+	 * @return The type of this event.
+	 */
+	public abstract EventType getType();
+
+	/**
+	 * Return the data for this event.
+	 * 
+	 * @return The data payload.
+	 * @throws IllegalStateException if this event is not a
+	 *             {@link EventType#DATA} event.
+	 */
+	public T getData() throws IllegalStateException {
+		throw new IllegalStateException(
+				"Not a DATA event, the event type is " + getType());
+	}
+
+	/**
+	 * Return the error that terminated the stream.
+	 * 
+	 * @return The error that terminated the stream.
+	 * @throws IllegalStateException if this event is not an
+	 *             {@link EventType#ERROR} event.
+	 */
+	public Exception getFailure() throws IllegalStateException {
+		throw new IllegalStateException(
+				"Not an ERROR event, the event type is " + getType());
+	}
+
+	/**
+	 * Answer if no more events will follow after this event.
+	 * 
+	 * @return {@code false} if this is a data event, otherwise {@code true}.
+	 */
+	public boolean isTerminal() {
+		return true;
+	}
+
+	/**
+	 * Create a new data event.
+	 * 
+	 * @param <T> The payload type.
+	 * @param payload The payload.
+	 * @return A new data event wrapping the specified payload.
+	 */
+	public static <T> PushEvent<T> data(T payload) {
+		return new DataEvent<T>(payload);
+	}
+
+	/**
+	 * Create a new error event.
+	 * 
+	 * @param <T> The payload type.
+	 * @param e The error.
+	 * @return A new error event with the specified error.
+	 */
+	public static <T> PushEvent<T> error(Exception e) {
+		return new ErrorEvent<T>(e);
+	}
+
+	/**
+	 * Create a new close event.
+	 * 
+	 * @param <T> The payload type.
+	 * @return A new close event.
+	 */
+	public static <T> PushEvent<T> close() {
+		return new CloseEvent<T>();
+	}
+
+	/**
+	 * Convenience to cast a close/error event to another payload type. Since
+	 * the payload type is not needed for these events this is harmless. This
+	 * therefore allows you to forward the close/error event downstream without
+	 * creating anew event.
+	 * 
+	 * @param <X> The new payload type.
+	 * @return The current error or close event mapped to a new payload type.
+	 * @throws IllegalStateException if the event is a {@link EventType#DATA}
+	 *             event.
+	 */
+	public <X> PushEvent<X> nodata() throws IllegalStateException {
+		@SuppressWarnings("unchecked")
+		PushEvent<X> result = (PushEvent<X>) this;
+		return result;
+	}
+
+	static final class DataEvent<T> extends PushEvent<T> {
+		private final T data;
+
+		DataEvent(T data) {
+			this.data = data;
+		}
+
+		@Override
+		public T getData() throws IllegalStateException {
+			return data;
+		}
+
+		@Override
+		public EventType getType() {
+			return DATA;
+		}
+
+		@Override
+		public boolean isTerminal() {
+			return false;
+		}
+
+		@Override
+		public <X> PushEvent<X> nodata() throws IllegalStateException {
+			throw new IllegalStateException("This event is a DATA event");
+		}
+	}
+
+	static final class ErrorEvent<T> extends PushEvent<T> {
+		private final Exception error;
+
+		ErrorEvent(Exception error) {
+			this.error = error;
+		}
+
+		@Override
+		public Exception getFailure() {
+			return error;
+		}
+
+		@Override
+		public EventType getType() {
+			return ERROR;
+		}
+	}
+
+	static final class CloseEvent<T> extends PushEvent<T> {
+		@Override
+		public EventType getType() {
+			return CLOSE;
+		}
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java
new file mode 100644
index 0000000..43de152
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An Async Event Consumer asynchronously receives Data events until it receives
+ * either a Close or Error event.
+ * 
+ * @param <T>
+ *            The type for the event payload
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventConsumer<T> {
+
+	/**
+	 * If ABORT is used as return value, the sender should close the channel all
+	 * the way to the upstream source. The ABORT will not guarantee that no
+	 * more events are delivered since this is impossible in a concurrent
+	 * environment. The consumer should accept subsequent events and close/clean
+	 * up when the Close or Error event is received.
+	 * 
+	 * Though ABORT has the value -1, any value less than 0 will act as an
+	 * abort.
+	 */
+	long	ABORT		= -1;
+
+	/**
+	 * A 0 indicates that the consumer is willing to receive subsequent events
+	 * at full speeds.
+	 * 
+	 * Any value more than 0 will indicate that the consumer is becoming
+	 * overloaded and wants a delay of the given milliseconds before the next
+	 * event is sent. This allows the consumer to pushback the event delivery
+	 * speed.
+	 */
+	long	CONTINUE	= 0;
+
+	/**
+	 * Accept an event from a source. Events can be delivered on multiple
+	 * threads simultaneously. However, Close and Error events are the last
+	 * events received, no more events must be sent after them.
+	 * 
+	 * @param event The event
+	 * @return less than 0 means abort, 0 means continue, more than 0 means
+	 *         delay ms
+	 * @throws Exception to indicate that an error has occured and that no
+	 *         further events should be delivered to this
+	 *         {@link PushEventConsumer}
+	 */
+	long accept(PushEvent<? extends T> event) throws Exception;
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java
new file mode 100644
index 0000000..d43399d
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushEventSource.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An event source. An event source can open a channel between a source and a
+ * consumer. Once the channel is opened (even before it returns) the source can
+ * send events to the consumer.
+ *
+ * A source should stop sending and automatically close the channel when sending
+ * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
+ * Values that are larger than 0 should be treated as a request to delay the
+ * next events with those number of milliseconds.
+ * 
+ * @param <T>
+ *            The payload type
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventSource<T> {
+
+	/**
+	 * Open the asynchronous channel between the source and the consumer. The
+	 * call returns an {@link AutoCloseable}. This can be closed, and should
+	 * close the channel, including sending a Close event if the channel was not
+	 * already closed. The returned object must be able to be closed multiple
+	 * times without sending more than one Close events.
+	 * 
+	 * @param aec the consumer (not null)
+	 * @return a {@link AutoCloseable} that can be used to close the stream
+	 * @throws Exception
+	 */
+	AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
new file mode 100644
index 0000000..c26bc8c
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStream.java
@@ -0,0 +1,609 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.TimeoutException;
+
+/**
+ * A Push Stream fulfills the same role as the Java 8 stream but it reverses the
+ * control direction. The Java 8 stream is pull based and this is push based. A
+ * Push Stream makes it possible to build a pipeline of transformations using a
+ * builder kind of model. Just like streams, it provides a number of terminating
+ * methods that will actually open the channel and perform the processing until
+ * the channel is closed (The source sends a Close event). The results of the
+ * processing will be send to a Promise, just like any error events. A stream
+ * can be used multiple times. The Push Stream represents a pipeline. Upstream
+ * is in the direction of the source, downstream is in the direction of the
+ * terminating method. Events are sent downstream asynchronously with no
+ * guarantee for ordering or concurrency. Methods are available to provide
+ * serialization of the events and splitting in background threads.
+ * 
+ * @param <T> The Payload type
+ */
+@ProviderType
+public interface PushStream<T> extends AutoCloseable {
+
+	/**
+	 * Must be run after the channel is closed. This handler will run after the
+	 * downstream methods have processed the close event and before the upstream
+	 * methods have closed.
+	 * 
+	 * @param closeHandler Will be called on close
+	 * @return This stream
+	 */
+	PushStream<T> onClose(Runnable closeHandler);
+
+	/**
+	 * Must be run after the channel is closed. This handler will run after the
+	 * downstream methods have processed the close event and before the upstream
+	 * methods have closed.
+	 * 
+	 * @param closeHandler Will be called on close
+	 * @return This stream
+	 */
+	PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
+
+	/**
+	 * Only pass events downstream when the predicate tests true.
+	 * 
+	 * @param predicate The predicate that is tested (not null)
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> filter(Predicate< ? super T> predicate);
+
+	/**
+	 * Map a payload value.
+	 * 
+	 * @param mapper The map function
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
+
+	/**
+	 * Flat map the payload value (turn one event into 0..n events of
+	 * potentially another type).
+	 * 
+	 * @param mapper The flat map function
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> flatMap(
+			Function< ? super T, ? extends PushStream< ? extends R>> mapper);
+
+	/**
+	 * Remove any duplicates. Notice that this can be expensive in a large
+	 * stream since it must track previous payloads.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> distinct();
+
+	/**
+	 * Sorted the elements, assuming that T extends Comparable. This is of
+	 * course expensive for large or infinite streams since it requires
+	 * buffering the stream until close.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sorted();
+
+	/**
+	 * Sorted the elements with the given comparator. This is of course
+	 * expensive for large or infinite streams since it requires buffering the
+	 * stream until close.
+	 * 
+	 * @param comparator
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sorted(Comparator< ? super T> comparator);
+
+	/**
+	 * Automatically close the channel after the maxSize number of elements is
+	 * received.
+	 * 
+	 * @param maxSize Maximum number of elements has been received
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> limit(long maxSize);
+
+	/**
+	 * Automatically close the channel after the given amount of time has
+	 * elapsed.
+	 * 
+	 * @param maxTime The maximum time that the stream should remain open
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> limit(Duration maxTime);
+
+	/**
+	 * Automatically fail the channel if no events are received for the
+	 * indicated length of time. If the timeout is reached then a failure event
+	 * containing a {@link TimeoutException} will be sent.
+	 * 
+	 * @param idleTime The length of time that the stream should remain open
+	 *            when no events are being received.
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> timeout(Duration idleTime);
+
+	/**
+	 * Skip a number of events in the channel.
+	 * 
+	 * @param n number of elements to skip
+	 * @throws IllegalArgumentException if the number of events to skip is
+	 *             negative
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> skip(long n);
+
+	/**
+	 * Execute the downstream events in up to n background threads. If more
+	 * requests are outstanding apply delay * nr of delayed threads back
+	 * pressure. A downstream channel that is closed or throws an exception will
+	 * cause all execution to cease and the stream to close
+	 * 
+	 * @param n number of simultaneous background threads to use
+	 * @param delay Nr of ms/thread that is queued back pressure
+	 * @param e an executor to use for the background threads.
+	 * @return Builder style (can be a new or the same object)
+	 * @throws IllegalArgumentException if the number of threads is < 1 or the
+	 *             delay is < 0
+	 * @throws NullPointerException if the Executor is null
+	 */
+	PushStream<T> fork(int n, int delay, Executor e)
+			throws IllegalArgumentException, NullPointerException;
+
+	/**
+	 * Buffer the events in a queue using default values for the queue size and
+	 * other behaviours. Buffered work will be processed asynchronously in the
+	 * rest of the chain. Buffering also blocks the transmission of back
+	 * pressure to previous elements in the chain, although back pressure is
+	 * honoured by the buffer.
+	 * <p>
+	 * Buffers are useful for "bursty" event sources which produce a number of
+	 * events close together, then none for some time. These bursts can
+	 * sometimes overwhelm downstream event consumers. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster than they can be consumed. For fast sources
+	 * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+	 * {@link #fork(int, int, Executor)} are better choices.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> buffer();
+
+	/**
+	 * Build a buffer to enqueue events in a queue using custom values for the
+	 * queue size and other behaviours. Buffered work will be processed
+	 * asynchronously in the rest of the chain. Buffering also blocks the
+	 * transmission of back pressure to previous elements in the chain, although
+	 * back pressure is honoured by the buffer.
+	 * <p>
+	 * Buffers are useful for "bursty" event sources which produce a number of
+	 * events close together, then none for some time. These bursts can
+	 * sometimes overwhelm downstream event consumers. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster than they can be consumed. For fast sources
+	 * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+	 * {@link #fork(int, int, Executor)} are better choices.
+	 * <p>
+	 * Buffers are also useful as "circuit breakers" in the pipeline. If a
+	 * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
+	 * the stream to close, preventing an event storm from reaching the client.
+	 * 
+	 * @param parallelism
+	 * @param executor
+	 * @param queue
+	 * @param queuePolicy
+	 * @param pushbackPolicy
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
+
+	/**
+	 * Merge in the events from another source. The resulting channel is not
+	 * closed until this channel and the channel from the source are closed.
+	 * 
+	 * @param source The source to merge in.
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> merge(PushEventSource< ? extends T> source);
+
+	/**
+	 * Merge in the events from another PushStream. The resulting channel is not
+	 * closed until this channel and the channel from the source are closed.
+	 * 
+	 * @param source The source to merge in.
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> merge(PushStream< ? extends T> source);
+
+	/**
+	 * Split the events to different streams based on a predicate. If the
+	 * predicate is true, the event is dispatched to that channel on the same
+	 * position. All predicates are tested for every event.
+	 * <p>
+	 * This method differs from other methods of AsyncStream in three
+	 * significant ways:
+	 * <ul>
+	 * <li>The return value contains multiple streams.</li>
+	 * <li>This stream will only close when all of these child streams have
+	 * closed.</li>
+	 * <li>Event delivery is made to all open children that accept the event.
+	 * </li>
+	 * </ul>
+	 * 
+	 * @param predicates the predicates to test
+	 * @return streams that map to the predicates
+	 */
+	@SuppressWarnings("unchecked")
+	PushStream<T>[] split(Predicate< ? super T>... predicates);
+
+	/**
+	 * Ensure that any events are delivered sequentially. That is, no
+	 * overlapping calls downstream. This can be used to turn a forked stream
+	 * (where for example a heavy conversion is done in multiple threads) back
+	 * into a sequential stream so a reduce is simple to do.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sequential();
+
+	/**
+	 * Coalesces a number of events into a new type of event. The input events
+	 * are forwarded to a accumulator function. This function returns an
+	 * Optional. If the optional is present, it's value is send downstream,
+	 * otherwise it is ignored.
+	 * 
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
+
+	/**
+	 * Coalesces a number of events into a new type of event. A fixed number of
+	 * input events are forwarded to a accumulator function. This function
+	 * returns new event data to be forwarded on.
+	 * 
+	 * @param count
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f);
+
+	/**
+	 * Coalesces a number of events into a new type of event. A variable number
+	 * of input events are forwarded to a accumulator function. The number of
+	 * events to be forwarded is determined by calling the count function. The
+	 * accumulator function then returns new event data to be forwarded on.
+	 * 
+	 * @param count
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	public <R> PushStream<R> coalesce(IntSupplier count,
+			Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a fixed time interval and then forwards
+	 * the events to an accumulator function. This function returns new event
+	 * data to be forwarded on. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * </ul>
+	 * 
+	 * @param d
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a fixed time interval and then forwards
+	 * the events to an accumulator function. This function returns new event
+	 * data to be forwarded on. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered by a task given to the supplied executor.</li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * </ul>
+	 * 
+	 * @param d
+	 * @param executor
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Duration d, Executor executor,
+			Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a variable time interval and then
+	 * forwards the events to an accumulator function. The length of time over
+	 * which events are buffered is determined by the time function. A maximum
+	 * number of events can also be requested, if this number of events is
+	 * reached then the accumulator will be called early. The accumulator
+	 * function returns new event data to be forwarded on. It is also given the
+	 * length of time for which the buffer accumulated data. This may be less
+	 * than the requested interval if the buffer reached the maximum number of
+	 * requested events early. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * <li>If the window finishes by hitting the maximum number of events then
+	 * the remaining time in the window will be applied as back-pressure to the
+	 * previous stage, attempting to slow the producer to the expected windowing
+	 * threshold.</li>
+	 * </ul>
+	 * 
+	 * @param timeSupplier
+	 * @param maxEvents
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Supplier<Duration> timeSupplier,
+			IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a variable time interval and then
+	 * forwards the events to an accumulator function. The length of time over
+	 * which events are buffered is determined by the time function. A maximum
+	 * number of events can also be requested, if this number of events is
+	 * reached then the accumulator will be called early. The accumulator
+	 * function returns new event data to be forwarded on. It is also given the
+	 * length of time for which the buffer accumulated data. This may be less
+	 * than the requested interval if the buffer reached the maximum number of
+	 * requested events early. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>If the window finishes by hitting the maximum number of events then
+	 * the remaining time in the window will be applied as back-pressure to the
+	 * previous stage, attempting to slow the producer to the expected windowing
+	 * threshold.</li>
+	 * </ul>
+	 * 
+	 * @param timeSupplier
+	 * @param maxEvents
+	 * @param executor
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Supplier<Duration> timeSupplier,
+			IntSupplier maxEvents, Executor executor,
+			BiFunction<Long,Collection<T>,R> f);
+
+	/**
+	 * Execute the action for each event received until the channel is closed.
+	 * This is a terminating method, the returned promise is resolved when the
+	 * channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param action The action to perform
+	 * @return A promise that is resolved when the channel closes.
+	 */
+	Promise<Void> forEach(Consumer< ? super T> action);
+
+	/**
+	 * Collect the payloads in an Object array after the channel is closed. This
+	 * is a terminating method, the returned promise is resolved when the
+	 * channel is closed.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return A promise that is resolved with all the payloads received over
+	 *         the channel
+	 */
+	Promise<Object[]> toArray();
+
+	/**
+	 * Collect the payloads in an Object array after the channel is closed. This
+	 * is a terminating method, the returned promise is resolved when the
+	 * channel is closed. The type of the array is handled by the caller using a
+	 * generator function that gets the length of the desired array.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param generator
+	 * @return A promise that is resolved with all the payloads received over
+	 *         the channel
+	 */
+	<A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
+
+	/**
+	 * Standard reduce, see Stream. The returned promise will be resolved when
+	 * the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param identity The identity/begin value
+	 * @param accumulator The accumulator
+	 * @return A
+	 */
+	Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
+
+	/**
+	 * Standard reduce without identity, so the return is an Optional. The
+	 * returned promise will be resolved when the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param accumulator The accumulator
+	 * @return an Optional
+	 */
+	Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
+
+	/**
+	 * Standard reduce with identity, accumulator and combiner. The returned
+	 * promise will be resolved when the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param identity
+	 * @param accumulator
+	 * @param combiner combines to U's into one U (e.g. how combine two lists)
+	 * @return The promise
+	 */
+	<U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator,
+			BinaryOperator<U> combiner);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param collector
+	 * @return A Promise representing the collected results
+	 */
+	<R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param comparator
+	 * @return A Promise representing the minimum value, or null if no values
+	 *         are seen before the end of the stream
+	 */
+	Promise<Optional<T>> min(Comparator< ? super T> comparator);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param comparator
+	 * @return A Promise representing the maximum value, or null if no values
+	 *         are seen before the end of the stream
+	 */
+	Promise<Optional<T>> max(Comparator< ? super T> comparator);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return A Promise representing the number of values in the stream
+	 */
+	Promise<Long> count();
+
+	/**
+	 * Close the channel and resolve the promise with true when the predicate
+	 * matches a payload. If the channel is closed before the predicate matches,
+	 * the promise is resolved with false.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event matches the predicate,
+	 *         or the end of the stream is reached
+	 */
+	Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Closes the channel and resolve the promise with false when the predicate
+	 * does not matches a pay load.If the channel is closed before, the promise
+	 * is resolved with true.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event fails to match the
+	 *         predicate, or the end of the stream is reached
+	 */
+	Promise<Boolean> allMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Closes the channel and resolve the promise with false when the predicate
+	 * matches any pay load. If the channel is closed before, the promise is
+	 * resolved with true.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event matches the predicate,
+	 *         or the end of the stream is reached
+	 */
+	Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Close the channel and resolve the promise with the first element. If the
+	 * channel is closed before, the Optional will have no value.
+	 * 
+	 * @return a promise
+	 */
+	Promise<Optional<T>> findFirst();
+
+	/**
+	 * Close the channel and resolve the promise with the first element. If the
+	 * channel is closed before, the Optional will have no value.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return a promise
+	 */
+	Promise<Optional<T>> findAny();
+
+	/**
+	 * Pass on each event to another consumer until the stream is closed.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param action
+	 * @return a promise
+	 */
+	Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java
new file mode 100644
index 0000000..d59c8d9
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilder.java
@@ -0,0 +1,52 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * A Builder for a PushStream. This Builder extends the support of a standard
+ * BufferBuilder by allowing the PushStream to be unbuffered.
+ * 
+ *
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends BufferBuilder<PushStream<T>,T,U> {
+
+	/**
+	 * Tells this {@link PushStreamBuilder} to create an unbuffered stream which
+	 * delivers events directly to its consumer using the incoming delivery
+	 * thread.
+	 * 
+	 * @return the builder
+	 */
+	PushStreamBuilder<T,U> unbuffered();
+
+	/*
+	 * Overridden methods to allow the covariant return of a PushStreamBuilder
+	 */
+
+	@Override
+	PushStreamBuilder<T,U> withBuffer(U queue);
+
+	@Override
+	PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+	@Override
+	PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+	@Override
+	PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy);
+
+	@Override
+	PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time);
+
+	@Override
+	PushStreamBuilder<T,U> withParallelism(int parallelism);
+
+	@Override
+	PushStreamBuilder<T,U> withExecutor(Executor executor);
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
new file mode 100644
index 0000000..5ec7cb3
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -0,0 +1,88 @@
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends AbstractBufferBuilder<PushStream<T>,T,U>
+		implements PushStreamBuilder<T,U> {
+
+	private final PushStreamProvider	psp;
+	private final PushEventSource<T>		eventSource;
+	private final Executor					previousExecutor;
+
+	private boolean							unbuffered;
+
+	PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
+			PushEventSource<T> eventSource) {
+		this.psp = psp;
+		this.previousExecutor = defaultExecutor;
+		this.eventSource = eventSource;
+		this.worker = defaultExecutor;
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withBuffer(U queue) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withBuffer(queue);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withQueuePolicy(
+			QueuePolicy<T,U> queuePolicy) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withQueuePolicy(
+			QueuePolicyOption queuePolicyOption) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withQueuePolicy(
+				queuePolicyOption);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+				pushbackPolicy);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+				pushbackPolicyOption, time);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withParallelism(int parallelism) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withParallelism(parallelism);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withExecutor(Executor executor) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withExecutor(executor);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> unbuffered() {
+		unbuffered = true;
+		return this;
+	}
+
+	@Override
+	public PushStream<T> create() {
+		if (unbuffered) {
+			return psp.createUnbufferedStream(eventSource, previousExecutor);
+		} else {
+			return psp.createStream(eventSource, concurrency, worker, buffer,
+					bufferingPolicy, backPressure);
+		}
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java
new file mode 100644
index 0000000..be87c6b
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -0,0 +1,581 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEvent.*;
+import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
+import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * A factory for {@link PushStream} instances, and utility methods for handling
+ * {@link PushEventSource}s and {@link PushEventConsumer}s
+ */
+public final class PushStreamProvider {
+
+	private final Lock					lock	= new ReentrantLock(true);
+
+	private int							schedulerReferences;
+
+	private ScheduledExecutorService	scheduler;
+
+	private ScheduledExecutorService acquireScheduler() {
+		try {
+			lock.lockInterruptibly();
+			try {
+				schedulerReferences += 1;
+
+				if (schedulerReferences == 1) {
+					scheduler = Executors.newSingleThreadScheduledExecutor();
+				}
+				return scheduler;
+			} finally {
+				lock.unlock();
+			}
+		} catch (InterruptedException e) {
+			throw new IllegalStateException("Unable to acquire the Scheduler",
+					e);
+		}
+	}
+
+	private void releaseScheduler() {
+		try {
+			lock.lockInterruptibly();
+			try {
+				schedulerReferences -= 1;
+
+				if (schedulerReferences == 0) {
+					scheduler.shutdown();
+					scheduler = null;
+				}
+			} finally {
+				lock.unlock();
+			}
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * Create a stream with the default configured buffer, executor size, queue,
+	 * queue policy and pushback policy. This is equivalent to calling
+	 * 
+	 * <code>
+	 *   buildStream(source).create();
+	 * </code>
+	 * 
+	 * <p>
+	 * This stream will be buffered from the event producer, and will honour
+	 * back pressure even if the source does not.
+	 * 
+	 * <p>
+	 * Buffered streams are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm downstream processors. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster (on average) than they can be consumed.
+	 * 
+	 * <p>
+	 * Event delivery will not begin until a terminal operation is reached on
+	 * the chain of AsyncStreams. Once a terminal operation is reached the
+	 * stream will be connected to the event source.
+	 * 
+	 * @param eventSource
+	 * @return A {@link PushStream} with a default initial buffer
+	 */
+	public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
+		return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32),
+				FAIL.getPolicy(), LINEAR.getPolicy(1000));
+	}
+	
+	/**
+	 * Builds a push stream with custom configuration.
+	 * 
+	 * <p>
+	 * 
+	 * The resulting {@link PushStream} may be buffered or unbuffered depending
+	 * on how it is configured.
+	 * 
+	 * @param eventSource The source of the events
+	 * 
+	 * @return A {@link PushStreamBuilder} for the stream
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
+			PushEventSource<T> eventSource) {
+		return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
+	}
+	
+	@SuppressWarnings({
+			"rawtypes", "unchecked"
+	})
+	<T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
+			PushEventSource<T> eventSource, int parallelism, Executor executor,
+			U queue, QueuePolicy<T,U> queuePolicy,
+			PushbackPolicy<T,U> pushbackPolicy) {
+
+		if (eventSource == null) {
+			throw new NullPointerException("There is no source of events");
+		}
+
+		if (parallelism < 0) {
+			throw new IllegalArgumentException(
+					"The supplied parallelism cannot be less than zero. It was "
+							+ parallelism);
+		} else if (parallelism == 0) {
+			parallelism = 1;
+		}
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(parallelism);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		if (queue == null) {
+			queue = (U) new ArrayBlockingQueue(32);
+		}
+
+		if (queuePolicy == null) {
+			queuePolicy = FAIL.getPolicy();
+		}
+
+		if (pushbackPolicy == null) {
+			pushbackPolicy = LINEAR.getPolicy(1000);
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new BufferedPushStreamImpl<>(this,
+				acquireScheduler(), queue, parallelism, toUse, queuePolicy,
+				pushbackPolicy, aec -> {
+					try {
+						return eventSource.open(aec);
+					} catch (Exception e) {
+						throw new RuntimeException(
+								"Unable to connect to event source", e);
+					}
+				});
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+		return stream;
+	}
+
+	<T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
+			Executor executor) {
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse,
+				acquireScheduler(), aec -> {
+					try {
+						return eventSource.open(aec);
+					} catch (Exception e) {
+						throw new RuntimeException(
+								"Unable to connect to event source", e);
+					}
+				});
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+
+		return stream;
+	}
+
+	/**
+	 * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+	 * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+	 * processing.
+	 * 
+	 * The {@link PushEventSource} will remain active until the backing stream
+	 * is closed, and permits multiple consumers to
+	 * {@link PushEventSource#open(PushEventConsumer)} it.
+	 * 
+	 * This is equivalent to: <code>
+	 *   buildEventSourceFromStream(stream).create();
+	 * </code>
+	 * 
+	 * @param stream
+	 * @return a {@link PushEventSource} backed by the {@link PushStream}
+	 */
+	public <T> PushEventSource<T> createEventSourceFromStream(
+			PushStream<T> stream) {
+		return buildEventSourceFromStream(stream).create();
+	}
+
+	/**
+	 * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+	 * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+	 * processing.
+	 * 
+	 * The {@link PushEventSource} will remain active until the backing stream
+	 * is closed, and permits multiple consumers to
+	 * {@link PushEventSource#open(PushEventConsumer)} it.
+	 * 
+	 * @param stream
+	 * 
+	 * @return a {@link PushEventSource} backed by the {@link PushStream}
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
+			PushStream<T> stream) {
+		return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
+			@Override
+			public PushEventSource<T> create() {
+				SimplePushEventSource<T> spes = createSimplePushEventSource(
+						concurrency, worker, buffer, bufferingPolicy, () -> {
+							try {
+								stream.close();
+							} catch (Exception e) {
+								// TODO Auto-generated catch block
+								e.printStackTrace();
+							}
+						});
+				spes.connectPromise()
+						.then(p -> stream.forEach(t -> spes.publish(t))
+								.onResolve(() -> spes.close()));
+				return spes;
+			}
+		};
+	}
+	
+
+	/**
+	 * Create a {@link SimplePushEventSource} with the supplied type and default
+	 * buffering behaviours. The SimplePushEventSource will respond to back
+	 * pressure requests from the consumers connected to it.
+	 * 
+	 * This is equivalent to: <code>
+	 *   buildSimpleEventSource(type).create();
+	 * </code>
+	 * 
+	 * @param type
+	 * @return a {@link SimplePushEventSource}
+	 */
+	public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) {
+		return createSimplePushEventSource(1, null,
+				new ArrayBlockingQueue<>(32),
+				FAIL.getPolicy(), () -> { /* Nothing else to do */ });
+	}
+	
+	/**
+	 * 
+	 * Build a {@link SimplePushEventSource} with the supplied type and custom
+	 * buffering behaviours. The SimplePushEventSource will respond to back
+	 * pressure requests from the consumers connected to it.
+	 * 
+	 * @param type
+	 * 
+	 * @return a {@link SimplePushEventSource}
+	 */
+
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
+			Class<T> type) {
+		return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
+			@Override
+			public SimplePushEventSource<T> create() {
+				return createSimplePushEventSource(concurrency, worker, buffer,
+						bufferingPolicy, () -> { /* Nothing else to do */ });
+			}
+		};
+	}
+	
+	@SuppressWarnings({
+			"unchecked", "rawtypes"
+	})
+	<T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(
+			int parallelism, Executor executor, U queue,
+			QueuePolicy<T,U> queuePolicy, Runnable onClose) {
+
+		if (parallelism < 0) {
+			throw new IllegalArgumentException(
+					"The supplied parallelism cannot be less than zero. It was "
+							+ parallelism);
+		} else if (parallelism == 0) {
+			parallelism = 1;
+		}
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		if (queue == null) {
+			queue = (U) new ArrayBlockingQueue(32);
+		}
+
+		if (queuePolicy == null) {
+			queuePolicy = FAIL.getPolicy();
+		}
+
+		SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
+				toUse, acquireScheduler(), queuePolicy, queue, parallelism,
+				() -> {
+					try {
+						onClose.run();
+					} catch (Exception e) {
+						// TODO log this?
+					}
+					if (closeExecutorOnClose) {
+						((ExecutorService) toUse).shutdown();
+					}
+					releaseScheduler();
+				});
+		return spes;
+	}
+
+	/**
+	 * Create a buffered {@link PushEventConsumer} with the default configured
+	 * buffer, executor size, queue, queue policy and pushback policy. This is
+	 * equivalent to calling
+	 * 
+	 * <code>
+	 *   buildBufferedConsumer(delegate).create();
+	 * </code>
+	 * 
+	 * <p>
+	 * The returned consumer will be buffered from the event source, and will
+	 * honour back pressure requests from its delegate even if the event source
+	 * does not.
+	 * 
+	 * <p>
+	 * Buffered consumers are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm the consumer. Buffering will not, however,
+	 * protect downstream components from a source which produces events faster
+	 * than they can be consumed.
+	 * 
+	 * @param delegate
+	 * @return a {@link PushEventConsumer} with a buffer directly before it
+	 */
+	public <T> PushEventConsumer<T> createBufferedConsumer(
+			PushEventConsumer<T> delegate) {
+		return buildBufferedConsumer(delegate).create();
+	}
+	
+	/**
+	 * Build a buffered {@link PushEventConsumer} with custom configuration.
+	 * <p>
+	 * The returned consumer will be buffered from the event source, and will
+	 * honour back pressure requests from its delegate even if the event source
+	 * does not.
+	 * <p>
+	 * Buffered consumers are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm the consumer. Buffering will not, however,
+	 * protect downstream components from a source which produces events faster
+	 * than they can be consumed.
+	 * <p>
+	 * Buffers are also useful as "circuit breakers". If a
+	 * {@link QueuePolicyOption#FAIL} is used then a full buffer will request
+	 * that the stream close, preventing an event storm from reaching the
+	 * client.
+	 * <p>
+	 * Note that this buffered consumer will close when it receives a terminal
+	 * event, or if the delegate returns negative backpressure. No further
+	 * events will be propagated after this time.
+	 * 
+	 * @param delegate
+	 * @return a {@link PushEventConsumer} with a buffer directly before it
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
+			PushEventConsumer<T> delegate) {
+		return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
+			@Override
+			public PushEventConsumer<T> create() {
+				PushEventPipe<T> pipe = new PushEventPipe<>();
+				
+				createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
+					.forEachEvent(delegate);
+				
+				return pipe;
+			}
+		};
+	}
+
+	static final class PushEventPipe<T>
+			implements PushEventConsumer<T>, PushEventSource<T> {
+
+		volatile PushEventConsumer< ? super T> delegate;
+
+		@Override
+		public AutoCloseable open(PushEventConsumer< ? super T> pec)
+				throws Exception {
+			return () -> { /* Nothing else to do */ };
+		}
+
+		@Override
+		public long accept(PushEvent< ? extends T> event) throws Exception {
+			return delegate.accept(event);
+		}
+
+	}
+
+	/**
+	 * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+	 * data from the stream will be pushed into the PushStream synchronously as
+	 * it is opened. This may make terminal operations blocking unless a buffer
+	 * has been added to the {@link PushStream}. Care should be taken with
+	 * infinite {@link Stream}s to avoid blocking indefinitely.
+	 * 
+	 * @param items The items to push into the PushStream
+	 * @return A PushStream containing the items from the Java Stream
+	 */
+	public <T> PushStream<T> streamOf(Stream<T> items) {
+		PushEventSource<T> pes = aec -> {
+			AtomicBoolean closed = new AtomicBoolean(false);
+
+			items.mapToLong(i -> {
+				try {
+					long returnValue = closed.get() ? -1 : aec.accept(data(i));
+					if (returnValue < 0) {
+						aec.accept(PushEvent.<T> close());
+					}
+					return returnValue;
+				} catch (Exception e) {
+					try {
+						aec.accept(PushEvent.<T> error(e));
+					} catch (Exception e2) {/* No further events needed */}
+					return -1;
+				}
+			}).filter(i -> i < 0).findFirst().orElseGet(() -> {
+				try {
+					return aec.accept(PushEvent.<T> close());
+				} catch (Exception e) {
+					return -1;
+				}
+			});
+
+			return () -> closed.set(true);
+		};
+
+		return this.<T> createUnbufferedStream(pes, null);
+	}
+
+	/**
+	 * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+	 * data from the stream will be pushed into the PushStream asynchronously
+	 * using the supplied Executor.
+	 * 
+	 * @param executor The worker to use to push items from the Stream into the
+	 *            PushStream
+	 * @param items The items to push into the PushStream
+	 * @return A PushStream containing the items from the Java Stream
+	 */
+	public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
+				this, toUse, acquireScheduler(), aec -> {
+					return () -> { /* No action to take */ };
+				}) {
+
+			@Override
+			protected boolean begin() {
+				if (super.begin()) {
+					Iterator<T> it = items.iterator();
+
+					toUse.execute(() -> pushData(it));
+
+					return true;
+				}
+				return false;
+			}
+
+			private void pushData(Iterator<T> it) {
+				while (it.hasNext()) {
+					try {
+						long returnValue = closed.get() == CLOSED ? -1
+								: handleEvent(data(it.next()));
+						if (returnValue != 0) {
+							if (returnValue < 0) {
+								close();
+								return;
+							} else {
+								scheduler.schedule(
+										() -> toUse.execute(() -> pushData(it)),
+										returnValue, MILLISECONDS);
+								return;
+							}
+						}
+					} catch (Exception e) {
+						close(error(e));
+					}
+				}
+				close();
+			}
+		};
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+
+		return stream;
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java
new file mode 100644
index 0000000..4f7f186
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A {@link PushbackPolicy} is used to calculate how much back pressure to apply
+ * based on the current buffer. The {@link PushbackPolicy} will be called after
+ * an event has been queued, and the returned value will be used as back
+ * pressure.
+ * 
+ * @see PushbackPolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
+	
+	/**
+	 * Given the current state of the queue, determine the level of back
+	 * pressure that should be applied
+	 * 
+	 * @param queue
+	 * @return a back pressure value in nanoseconds
+	 * @throws Exception
+	 */
+	public long pushback(U queue) throws Exception;
+	
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java
new file mode 100644
index 0000000..ecd0e3e
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/PushbackPolicyOption.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link PushbackPolicyOption} provides a standard set of simple
+ * {@link PushbackPolicy} implementations.
+ * 
+ * @see PushbackPolicy
+ */
+public enum PushbackPolicyOption {
+
+	/**
+	 * Returns a fixed amount of back pressure, independent of how full the
+	 * buffer is
+	 */
+	FIXED {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> value;
+		}
+	},
+	/**
+	 * Returns zero back pressure until the buffer is full, then it returns a
+	 * fixed value
+	 */
+	ON_FULL_FIXED {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> q.remainingCapacity() == 0 ? value : 0;
+		}
+	},
+	/**
+	 * Returns zero back pressure until the buffer is full, then it returns an
+	 * exponentially increasing amount, starting with the supplied value and
+	 * doubling it each time. Once the buffer is no longer full the back
+	 * pressure returns to zero.
+	 */
+	ON_FULL_EXPONENTIAL {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			AtomicInteger backoffCount = new AtomicInteger(0);
+			return q -> {
+				if (q.remainingCapacity() == 0) {
+					return value << backoffCount.getAndIncrement();
+				}
+				backoffCount.set(0);
+				return 0;
+			};
+
+		}
+	},
+	/**
+	 * Returns zero back pressure when the buffer is empty, then it returns a
+	 * linearly increasing amount of back pressure based on how full the buffer
+	 * is. The maximum value will be returned when the buffer is full.
+	 */
+	LINEAR {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> {
+				long remainingCapacity = q.remainingCapacity();
+				long used = q.size();
+				return (value * used) / (used + remainingCapacity);
+			};
+		}
+	};
+
+	/**
+	 * Create a {@link PushbackPolicy} instance configured with a base back
+	 * pressure time in nanoseconds
+	 * 
+	 * The actual backpressure returned will vary based on the selected
+	 * implementation, the base value, and the state of the buffer.
+	 * 
+	 * @param value
+	 * @return A {@link PushbackPolicy} to use
+	 */
+	public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value);
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java
new file mode 100644
index 0000000..cba94b1
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicy.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.osgi.annotation.versioning.ConsumerType;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+/**
+ * A {@link QueuePolicy} is used to control how events should be queued in the
+ * current buffer. The {@link QueuePolicy} will be called when an event has
+ * arrived.
+ * 
+ * @see QueuePolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+
+@ConsumerType
+@FunctionalInterface
+public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { 
+	
+	/**
+	 * Enqueue the event and return the remaining capacity available for events
+	 * 
+	 * @param queue
+	 * @param event
+	 * @throws Exception If an error ocurred adding the event to the queue. This
+	 *         exception will cause the connection between the
+	 *         {@link PushEventSource} and the {@link PushEventConsumer} to be
+	 *         closed with an {@link EventType#ERROR}
+	 */
+	public void doOffer(U queue, PushEvent<? extends T> event) throws Exception;
+	
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java
new file mode 100644
index 0000000..35df890
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/QueuePolicyOption.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * {@link QueuePolicyOption} provides a standard set of simple
+ * {@link QueuePolicy} implementations.
+ * 
+ * @see QueuePolicy
+ */
+public enum QueuePolicyOption {
+	/**
+	 * Attempt to add the supplied event to the queue. If the queue is unable to
+	 * immediately accept the value then discard the value at the head of the
+	 * queue and try again. Repeat this process until the event is enqueued.
+	 */
+	DISCARD_OLDEST {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> {
+				while (!queue.offer(event)) {
+					queue.poll();
+				}
+			};
+		}
+	},
+	/**
+	 * Attempt to add the supplied event to the queue, blocking until the
+	 * enqueue is successful.
+	 */
+	BLOCK {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> {
+				try {
+					queue.put(event);
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			};
+		}
+	},
+	/**
+	 * Attempt to add the supplied event to the queue, throwing an exception if
+	 * the queue is full.
+	 */
+	FAIL {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> queue.add(event);
+		}
+	};
+
+	/**
+	 * @return a {@link QueuePolicy} implementation
+	 */
+	public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java
new file mode 100644
index 0000000..747b453
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSource.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A {@link SimplePushEventSource} is a helper that makes it simpler to write a
+ * {@link PushEventSource}. Users do not need to manage multiple registrations
+ * to the stream, nor do they have to be concerned with back pressure.
+ *
+ * @param <T> The type of the events produced by this source
+ */
+@ProviderType
+public interface SimplePushEventSource<T>
+		extends PushEventSource<T>, AutoCloseable {
+	/**
+	 * Close this source. Calling this method indicates that there will never be
+	 * any more events published by it. Calling this method sends a close event
+	 * to all connected consumers. After calling this method any
+	 * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)}
+	 * this source will immediately receive a close event.
+	 */
+	@Override
+	void close();
+
+	/**
+	 * Asynchronously publish an event to this stream and all connected
+	 * {@link PushEventConsumer} instances. When this method returns there is no
+	 * guarantee that all consumers have been notified. Events published by a
+	 * single thread will maintain their relative ordering, however they may be
+	 * interleaved with events from other threads.
+	 * 
+	 * @param t
+	 * @throws IllegalStateException if the source is closed
+	 */
+	void publish(T t);
+
+	/**
+	 * Close this source for now, but potentially reopen it later. Calling this
+	 * method asynchronously sends a close event to all connected consumers.
+	 * After calling this method any {@link PushEventConsumer} that wishes may
+	 * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+	 * events.
+	 */
+	void endOfStream();
+
+	/**
+	 * Close this source for now, but potentially reopen it later. Calling this
+	 * method asynchronously sends an error event to all connected consumers.
+	 * After calling this method any {@link PushEventConsumer} that wishes may
+	 * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+	 * events.
+	 *
+	 * @param e the error
+	 */
+	void error(Exception e);
+
+	/**
+	 * Determine whether there are any {@link PushEventConsumer}s for this
+	 * {@link PushEventSource}. This can be used to skip expensive event
+	 * creation logic when there are no listeners.
+	 * 
+	 * @return true if any consumers are currently connected
+	 */
+	boolean isConnected();
+
+	/**
+	 * This method can be used to delay event generation until an event source
+	 * has connected. The returned promise will resolve as soon as one or more
+	 * {@link PushEventConsumer} instances have opened the
+	 * SimplePushEventSource.
+	 * <p>
+	 * The returned promise may already be resolved if this
+	 * {@link SimplePushEventSource} already has connected consumers. If the
+	 * {@link SimplePushEventSource} is closed before the returned Promise
+	 * resolves then it will be failed with an {@link IllegalStateException}.
+	 * <p>
+	 * Note that the connected consumers are able to asynchronously close their
+	 * connections to this {@link SimplePushEventSource}, and therefore it is
+	 * possible that once the promise resolves this
+	 * {@link SimplePushEventSource} may no longer be connected to any
+	 * consumers.
+	 * 
+	 * @return A promise representing the connection state of this EventSource
+	 */
+	Promise<Void> connectPromise();
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
new file mode 100644
index 0000000..e31c9bf
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
@@ -0,0 +1,337 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+
+class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		implements SimplePushEventSource<T> {
+
+	private final Object								lock		= new Object();
+
+	private final Executor								worker;
+
+	private final ScheduledExecutorService				scheduler;
+
+	private final QueuePolicy<T,U>						queuePolicy;
+
+	private final U										queue;
+
+	private final int									parallelism;
+
+	private final Semaphore								semaphore;
+
+	private final List<PushEventConsumer< ? super T>>	connected	= new ArrayList<>();
+
+	private final Runnable								onClose;
+
+	private boolean										closed;
+	
+	private Deferred<Void>								connectPromise;
+
+	private boolean										waitForFinishes;
+
+
+	public SimplePushEventSourceImpl(Executor worker,
+			ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy,
+			U queue, int parallelism, Runnable onClose) {
+		this.worker = worker;
+		this.scheduler = scheduler;
+		this.queuePolicy = queuePolicy;
+		this.queue = queue;
+		this.parallelism = parallelism;
+		this.semaphore = new Semaphore(parallelism);
+		this.onClose = onClose;
+		this.closed = false;
+		this.connectPromise = null;
+	}
+
+	@Override
+	public AutoCloseable open(PushEventConsumer< ? super T> pec)
+			throws Exception {
+		Deferred<Void> toResolve = null;
+		synchronized (lock) {
+			if (closed) {
+				throw new IllegalStateException(
+						"This PushEventConsumer is closed");
+			}
+
+			toResolve = connectPromise;
+			connectPromise = null;
+
+			connected.add(pec);
+		}
+
+		if (toResolve != null) {
+			toResolve.resolve(null);
+		}
+
+		return () -> {
+			closeConsumer(pec, PushEvent.close());
+		};
+	}
+
+	private void closeConsumer(PushEventConsumer< ? super T> pec,
+			PushEvent<T> event) {
+		boolean sendClose;
+		synchronized (lock) {
+			sendClose = connected.remove(pec);
+		}
+		if (sendClose) {
+			doSend(pec, event);
+		}
+	}
+
+	private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+		try {
+			worker.execute(() -> safePush(pec, event));
+		} catch (RejectedExecutionException ree) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				close(PushEvent.error(ree));
+			} else {
+				safePush(pec, event);
+			}
+		}
+	}
+
+	@SuppressWarnings("boxing")
+	private Promise<Long> doSendWithBackPressure(
+			PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+		Deferred<Long> d = new Deferred<>();
+		try {
+			worker.execute(
+					() -> d.resolve(System.nanoTime() + safePush(pec, event)));
+		} catch (RejectedExecutionException ree) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				close(PushEvent.error(ree));
+				return Promises.resolved(System.nanoTime());
+			} else {
+				return Promises
+						.resolved(System.nanoTime() + safePush(pec, event));
+			}
+		}
+		return d.getPromise();
+	}
+
+	private long safePush(PushEventConsumer< ? super T> pec,
+			PushEvent<T> event) {
+		try {
+			long backpressure = pec.accept(event) * 1000000;
+			if (backpressure < 0 && !event.isTerminal()) {
+				closeConsumer(pec, PushEvent.close());
+				return -1;
+			}
+			return backpressure;
+		} catch (Exception e) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				closeConsumer(pec, PushEvent.error(e));
+			}
+			return -1;
+		}
+	}
+
+	@Override
+	public void close() {
+		close(PushEvent.close());
+	}
+
+	private void close(PushEvent<T> event) {
+		List<PushEventConsumer< ? super T>> toClose;
+		Deferred<Void> toFail = null;
+		synchronized (lock) {
+			if(!closed) {
+				closed = true;
+				
+				toClose = new ArrayList<>(connected);
+				connected.clear();
+				queue.clear();
+
+				if(connectPromise != null) {
+					toFail = connectPromise;
+					connectPromise = null;
+				}
+			} else {
+				toClose = emptyList();
+			}
+		}
+
+		toClose.stream().forEach(pec -> doSend(pec, event));
+
+		if (toFail != null) {
+			toFail.resolveWith(closedConnectPromise());
+		}
+
+		onClose.run();
+	}
+
+	@Override
+	public void publish(T t) {
+		enqueueEvent(PushEvent.data(t));
+	}
+
+	@Override
+	public void endOfStream() {
+		enqueueEvent(PushEvent.close());
+	}
+
+	@Override
+	public void error(Exception e) {
+		enqueueEvent(PushEvent.error(e));
+	}
+
+	private void enqueueEvent(PushEvent<T> event) {
+		synchronized (lock) {
+			if (closed || connected.isEmpty()) {
+				return;
+			}
+		}
+
+		try {
+			queuePolicy.doOffer(queue, event);
+			boolean start;
+			synchronized (lock) {
+				start = !waitForFinishes && semaphore.tryAcquire();
+			}
+			if (start) {
+				startWorker();
+			}
+		} catch (Exception e) {
+			close(PushEvent.error(e));
+			throw new IllegalStateException(
+					"The queue policy threw an exception", e);
+		}
+	}
+
+	@SuppressWarnings({
+			"unchecked", "boxing"
+	})
+	private void startWorker() {
+		worker.execute(() -> {
+			try {
+				
+				for(;;) {
+					PushEvent<T> event;
+					List<PushEventConsumer< ? super T>> toCall;
+					boolean resetWait = false;
+					synchronized (lock) {
+						if(waitForFinishes) {
+							semaphore.release();
+							while(waitForFinishes) {
+								lock.notifyAll();
+								lock.wait();
+							}
+							semaphore.acquire();
+						}
+
+						event = (PushEvent<T>) queue.poll();
+						
+						if(event == null) {
+							break;
+						}
+
+						toCall = new ArrayList<>(connected);
+						if (event.isTerminal()) {
+							waitForFinishes = true;
+							resetWait = true;
+							connected.clear();
+							while (!semaphore.tryAcquire(parallelism - 1)) {
+								lock.wait();
+							}
+						}
+					}
+					
+					List<Promise<Long>> calls = toCall.stream().map(pec -> {
+						if (semaphore.tryAcquire()) {
+							try {
+								return doSendWithBackPressure(pec, event);
+							} finally {
+								semaphore.release();
+							}
+						} else {
+							return Promises.resolved(
+									System.nanoTime() + safePush(pec, event));
+						}
+					}).collect(toList());
+
+					long toWait = Promises.all(calls)
+							.map(l -> l.stream()
+									.max(Long::compareTo)
+										.orElseGet(() -> System.nanoTime()))
+							.getValue() - System.nanoTime();
+					
+					
+					if (toWait > 0) {
+						scheduler.schedule(this::startWorker, toWait,
+								NANOSECONDS);
+						return;
+					}
+
+					if (resetWait == true) {
+						synchronized (lock) {
+							waitForFinishes = false;
+							lock.notifyAll();
+						}
+					}
+				}
+
+				semaphore.release();
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+			if (queue.peek() != null && semaphore.tryAcquire()) {
+				try {
+					startWorker();
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+				}
+			}
+		});
+
+	}
+
+	@Override
+	public boolean isConnected() {
+		synchronized (lock) {
+			return !connected.isEmpty();
+		}
+	}
+
+	@Override
+	public Promise<Void> connectPromise() {
+		synchronized (lock) {
+			if (closed) {
+				return closedConnectPromise();
+			}
+
+			if (connected.isEmpty()) {
+				if (connectPromise == null) {
+					connectPromise = new Deferred<>();
+				}
+				return connectPromise.getPromise();
+			} else {
+				return Promises.resolved(null);
+			}
+		}
+	}
+
+	private Promise<Void> closedConnectPromise() {
+		return Promises.failed(new IllegalStateException(
+				"This SimplePushEventSource is closed"));
+	}
+
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
new file mode 100644
index 0000000..faf9e65
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -0,0 +1,73 @@
+package org.osgi.util.pushstream;
+
+import static java.util.Optional.ofNullable;
+import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+	extends AbstractPushStreamImpl<T> implements PushStream<T> {
+	
+	protected final Function<PushEventConsumer<T>,AutoCloseable>	connector;
+	
+	protected final AtomicReference<AutoCloseable>					upstream	= new AtomicReference<AutoCloseable>();
+	
+	UnbufferedPushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler,
+			Function<PushEventConsumer<T>,AutoCloseable> connector) {
+		super(psp, executor, scheduler);
+		this.connector = connector;
+	}
+
+	@Override
+	protected boolean close(PushEvent<T> event) {
+		if(super.close(event)) {
+			ofNullable(upstream.getAndSet(() -> {
+				// This block doesn't need to do anything, but the presence
+				// of the Closable is needed to prevent duplicate begins
+			})).ifPresent(c -> {
+					try {
+						c.close();
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				});
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean begin() {
+		if(closed.compareAndSet(BUILDING, STARTED)) {
+			AutoCloseable toClose = connector.apply(this::handleEvent);
+			if(!upstream.compareAndSet(null,toClose)) {
+				//TODO log that we tried to connect twice...
+				try {
+					toClose.close();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+
+			if (closed.get() == CLOSED
+					&& upstream.compareAndSet(toClose, null)) {
+				// We closed before setting the upstream - close it now
+				try {
+					toClose.close();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+}
diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java
new file mode 100644
index 0000000..6a28fa0
--- /dev/null
+++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/util/pushstream/package-info.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Push Stream Package Version 1.0.
+ * 
+ * <p>
+ * Bundles wishing to use this package must list the package in the
+ * Import-Package header of the bundle's manifest.
+ * 
+ * <p>
+ * Example import for consumers using the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"}
+ * <p>
+ * Example import for providers implementing the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"}
+ * 
+ * @author $Id$
+ */
+
+@Version("1.0")
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.Version;