Bug 521204 - [osgi R7] update log stream impl and API

Change-Id: I2425c2a2953d0d06ce8ffdad722d37f1f2327174
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
index 99f5645..f48266b 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ * Copyright (c) OSGi Alliance (2016, 2017). All Rights Reserved.
  * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,10 +19,11 @@
 import org.osgi.annotation.versioning.ProviderType;
 import org.osgi.service.log.LogEntry;
 import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.QueuePolicyOption;
 
 /**
- * LogStreamProvider service for creating a PushStream of {@link LogEntry}
- * objects.
+ * LogStreamProvider service for creating a {@link PushStream} of
+ * {@link LogEntry} objects.
  * 
  * @ThreadSafe
  * @author $Id$
@@ -30,34 +31,42 @@
 @ProviderType
 public interface LogStreamProvider {
 	/**
-	 * Creation options for the PushStream of {@link LogEntry} objects.
+	 * Creation options for the {@link PushStream} of {@link LogEntry} objects.
 	 */
 	enum Options {
 		/**
 		 * Include history.
 		 * <p>
-		 * Prime the created PushStream with the past {@link LogEntry} objects.
-		 * The number of past {@link LogEntry} objects is implementation
-		 * specific.
+		 * Prime the created PushStream with the available historical
+		 * {@link LogEntry} objects. The number of available {@link LogEntry}
+		 * objects is implementation specific.
 		 * <p>
-		 * The created PushStream will supply the past {@link LogEntry} objects
-		 * followed by newly created {@link LogEntry} objects.
+		 * The created PushStream will supply the available historical
+		 * {@link LogEntry} objects followed by newly created {@link LogEntry}
+		 * objects.
 		 */
 		HISTORY;
 	}
 
 	/**
-	 * Create a PushStream of {@link LogEntry} objects.
+	 * Create a {@link PushStream} of {@link LogEntry} objects.
 	 * <p>
-	 * The returned PushStream is an unbuffered stream with a parallelism of
-	 * one.
+	 * The returned PushStream must:
+	 * <ul>
+	 * <li>Be buffered with a buffer large enough to contain the history, if
+	 * included.</li>
+	 * <li>Have the {@link QueuePolicyOption#DISCARD_OLDEST} queue policy
+	 * option.</li>
+	 * <li>Use a shared executor.</li>
+	 * <li>Have a parallelism of one.</li>
+	 * </ul>
 	 * <p>
 	 * When this LogStreamProvider service is released by the obtaining bundle,
-	 * this LogStreamProvider service must call {@code close()} on the returned
-	 * PushStream object if it has not already been closed.
+	 * this LogStreamProvider service must call {@link PushStream#close()} on
+	 * the returned PushStream object if it has not already been closed.
 	 * 
 	 * @param options The options to use when creating the PushStream.
-	 * @return A PushStream of {@link LogEntry} objects.
+	 * @return A {@link PushStream} of {@link LogEntry} objects.
 	 */
 	PushStream<LogEntry> createStream(Options... options);
 }
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
index 2293c1a..d64ac12 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
@@ -37,14 +37,14 @@
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.function.IntSupplier;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
 import org.osgi.util.promise.Deferred;
 import org.osgi.util.promise.Promise;
 import org.osgi.util.promise.TimeoutException;
@@ -52,6 +52,8 @@
 
 abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
 	
+	private final Function<T,T> IDENTITY = x -> x;
+
 	static enum State {
 		BUILDING, STARTED, CLOSED
 	}
@@ -70,6 +72,8 @@
 
 	protected abstract boolean begin();
 	
+	protected abstract void upstreamClose(PushEvent< ? > close);
+
 	AbstractPushStreamImpl(PushStreamProvider psp,
 			Executor executor, ScheduledExecutorService scheduler) {
 		this.psp = psp;
@@ -107,16 +111,23 @@
 	
 	@Override
 	public void close() {
-		close(PushEvent.close());
+		PushEvent<T> close = PushEvent.close();
+		if (close(close, true)) {
+			upstreamClose(close);
+		}
 	}
 	
 	protected boolean close(PushEvent<T> event) {
+		return close(event, true);
+	}
+
+	protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) {
 		if(!event.isTerminal()) {
 			throw new IllegalArgumentException("The event " + event  + " is not a close event.");
 		}
 		if(closed.getAndSet(CLOSED) != CLOSED) {
 			PushEventConsumer<T> aec = next.getAndSet(null);
-			if(aec != null) {
+			if (sendDownStreamEvent && aec != null) {
 				try {
 					aec.accept(event);
 				} catch (Exception e) {
@@ -411,7 +422,10 @@
 			scheduler.schedule(() -> check(lastTime, timeout),
 					timeout - elapsed, NANOSECONDS);
 		} else {
-			close(PushEvent.error(new TimeoutException()));
+			PushEvent<T> error = PushEvent.error(new TimeoutException());
+			close(error);
+			// Upstream close is needed as we have no direct backpressure
+			upstreamClose(error);
 		}
 	}
 
@@ -461,10 +475,18 @@
 				ex.execute(() -> {
 					try {
 						if (eventStream.handleEvent(event) < 0) {
-							eventStream.close(PushEvent.close());
+							PushEvent<T> close = PushEvent.close();
+							eventStream.close(close);
+							// Upstream close is needed as we have no direct
+							// backpressure
+							upstreamClose(close);
 						}
 					} catch (Exception e1) {
-						close(PushEvent.error(e1));
+						PushEvent<T> error = PushEvent.error(e1);
+						close(error);
+						// Upstream close is needed as we have no direct
+						// backpressure
+						upstreamClose(error);
 					} finally {
 						s.release(1);
 					}
@@ -541,7 +563,7 @@
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			} 
-		}).map(Function.identity());
+		}).map(IDENTITY);
 	}
 
 	@Override
@@ -591,6 +613,12 @@
 				}
 				return false;
 			}
+
+			@Override
+			protected void upstreamClose(PushEvent< ? > close) {
+				AbstractPushStreamImpl.this.upstreamClose(close);
+				source.close();
+			}
 		};
 		
 
@@ -607,7 +635,7 @@
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			} 
-		}).map(Function.identity());
+		}).map(IDENTITY);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -803,7 +831,7 @@
 
 	private <R> long aggregateAndForward(Function<Collection<T>,R> f,
 			AbstractPushStreamImpl<R> eventStream,
-			PushEvent< ? extends T> event, Queue<T> queue) {
+			PushEvent< ? extends T> event, Queue<T> queue) throws Exception {
 		if (!queue.offer(event.getData())) {
 			((ArrayQueue<T>) queue).forcePush(event.getData());
 		}
@@ -820,7 +848,13 @@
 	@Override
 	public <R> PushStream<R> window(Duration time, Executor executor,
 			Function<Collection<T>,R> f) {
-		return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+		return window(() -> time, () -> 0, executor, (t, c) -> {
+			try {
+				return f.apply(c);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		});
 	}
 
 	@Override
@@ -946,7 +980,11 @@
 													.toMillis(elapsed)),
 											collected)));
 						} catch (Exception e) {
-							close(PushEvent.error(e));
+							PushEvent<T> error = PushEvent.error(e);
+							close(error);
+							// Upstream close is needed as we have no direct
+							// backpressure
+							upstreamClose(error);
 						}
 					});
 				}
@@ -1126,7 +1164,11 @@
 								Long.valueOf(NANOSECONDS.toMillis(elapsed)),
 								collected)));
 					} catch (Exception e) {
-						close(PushEvent.error(e));
+						PushEvent<T> error = PushEvent.error(e);
+						close(error);
+						// Upstream close is needed as we have no direct
+						// backpressure
+						upstreamClose(error);
 					}
 				});
 			}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
index 2aa6ec7..dfcf8b5 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
@@ -3,6 +3,8 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 
+import org.osgi.annotation.versioning.ProviderType;
+
 /**
  * Create a buffered section of a Push-based stream
  *
@@ -10,6 +12,7 @@
  * @param <T> The type of objects in the {@link PushEvent}
  * @param <U> The type of the Queue used in the user specified buffer
  */
+@ProviderType
 public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
 
 	/**
@@ -74,6 +77,6 @@
 	/**
 	 * @return the object being built
 	 */
-	R create();
+	R build();
 
 }
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
index 3a4da2f..320adee 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
@@ -31,5 +31,13 @@
 		// The base implementation has nothing to do, but
 		// this method is used in windowing
 	}
+
+	@Override
+	protected void upstreamClose(PushEvent< ? > close) {
+		if (closed.get() != CLOSED) {
+			close(close.nodata(), false);
+		}
+		previous.upstreamClose(close);
+	}
 	
 }
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
index 028f0a3..968f668 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
@@ -18,6 +18,8 @@
 
 import static org.osgi.util.pushstream.PushEvent.EventType.*;
 
+import org.osgi.annotation.versioning.ProviderType;
+
 /**
  * A PushEvent is an immutable object that is transferred through a
  * communication channel to push information to a downstream consumer. The event
@@ -35,6 +37,7 @@
  * @param <T> The payload type of the event.
  * @Immutable
  */
+@ProviderType
 public abstract class PushEvent<T> {
 
 	/**
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
index c26bc8c..1471a84 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
@@ -25,14 +25,14 @@
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.function.IntSupplier;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collector;
 
 import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.function.Function;
+import org.osgi.util.function.Predicate;
 import org.osgi.util.promise.Promise;
 import org.osgi.util.promise.TimeoutException;
 
@@ -56,6 +56,16 @@
 public interface PushStream<T> extends AutoCloseable {
 
 	/**
+	 * Close this PushStream by sending an event of type
+	 * {@link PushEvent.EventType#CLOSE} downstream. Closing a PushStream is a
+	 * safe operation that will not throw an Exception.
+	 * <p>
+	 * Calling <code>close()</code> on a closed PushStream has no effect.
+	 */
+	@Override
+	void close();
+
+	/**
 	 * Must be run after the channel is closed. This handler will run after the
 	 * downstream methods have processed the close event and before the upstream
 	 * methods have closed.
@@ -222,12 +232,8 @@
 	 * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
 	 * the stream to close, preventing an event storm from reaching the client.
 	 * 
-	 * @param parallelism
-	 * @param executor
-	 * @param queue
-	 * @param queuePolicy
-	 * @param pushbackPolicy
-	 * @return Builder style (can be a new or the same object)
+	 * @return A builder which can be used to configure the buffer for this
+	 *         pipeline stage.
 	 */
 	<U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
index d59c8d9..506c8f2 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
@@ -3,6 +3,8 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 
+import org.osgi.annotation.versioning.ProviderType;
+
 /**
  * A Builder for a PushStream. This Builder extends the support of a standard
  * BufferBuilder by allowing the PushStream to be unbuffered.
@@ -11,6 +13,7 @@
  * @param <T> The type of objects in the {@link PushEvent}
  * @param <U> The type of the Queue used in the user specified buffer
  */
+@ProviderType
 public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
 		extends BufferBuilder<PushStream<T>,T,U> {
 
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
index 5ec7cb3..0bf8d71 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
@@ -77,7 +77,7 @@
 	}
 
 	@Override
-	public PushStream<T> create() {
+	public PushStream<T> build() {
 		if (unbuffered) {
 			return psp.createUnbufferedStream(eventSource, previousExecutor);
 		} else {
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
index be87c6b..f63c662 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
@@ -32,7 +32,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 /**
@@ -193,7 +192,7 @@
 				((ExecutorService) toUse).shutdown();
 			}
 			releaseScheduler();
-		}).map(Function.identity());
+		}).map(x -> x);
 		return stream;
 	}
 
@@ -226,7 +225,7 @@
 				((ExecutorService) toUse).shutdown();
 			}
 			releaseScheduler();
-		}).map(Function.identity());
+		}).map(x -> x);
 
 		return stream;
 	}
@@ -249,7 +248,7 @@
 	 */
 	public <T> PushEventSource<T> createEventSourceFromStream(
 			PushStream<T> stream) {
-		return buildEventSourceFromStream(stream).create();
+		return buildEventSourceFromStream(stream).build();
 	}
 
 	/**
@@ -269,7 +268,7 @@
 			PushStream<T> stream) {
 		return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
 			@Override
-			public PushEventSource<T> create() {
+			public PushEventSource<T> build() {
 				SimplePushEventSource<T> spes = createSimplePushEventSource(
 						concurrency, worker, buffer, bufferingPolicy, () -> {
 							try {
@@ -321,7 +320,7 @@
 			Class<T> type) {
 		return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
 			@Override
-			public SimplePushEventSource<T> create() {
+			public SimplePushEventSource<T> build() {
 				return createSimplePushEventSource(concurrency, worker, buffer,
 						bufferingPolicy, () -> { /* Nothing else to do */ });
 			}
@@ -403,7 +402,7 @@
 	 */
 	public <T> PushEventConsumer<T> createBufferedConsumer(
 			PushEventConsumer<T> delegate) {
-		return buildBufferedConsumer(delegate).create();
+		return buildBufferedConsumer(delegate).build();
 	}
 	
 	/**
@@ -435,7 +434,7 @@
 			PushEventConsumer<T> delegate) {
 		return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
 			@Override
-			public PushEventConsumer<T> create() {
+			public PushEventConsumer<T> build() {
 				PushEventPipe<T> pipe = new PushEventPipe<>();
 				
 				createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
@@ -574,7 +573,7 @@
 				((ExecutorService) toUse).shutdown();
 			}
 			releaseScheduler();
-		}).map(Function.identity());
+		}).map(x -> x);
 
 		return stream;
 	}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
index faf9e65..d6116e3 100644
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
@@ -24,25 +24,30 @@
 	}
 
 	@Override
-	protected boolean close(PushEvent<T> event) {
-		if(super.close(event)) {
-			ofNullable(upstream.getAndSet(() -> {
-				// This block doesn't need to do anything, but the presence
-				// of the Closable is needed to prevent duplicate begins
-			})).ifPresent(c -> {
-					try {
-						c.close();
-					} catch (Exception e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				});
+	protected boolean close(PushEvent<T> event, boolean sendDownStreamEvent) {
+		if (super.close(event, sendDownStreamEvent)) {
+			upstreamClose(event);
 			return true;
 		}
 		return false;
 	}
 
 	@Override
+	protected void upstreamClose(PushEvent< ? > close) {
+		ofNullable(upstream.getAndSet(() -> {
+			// This block doesn't need to do anything, but the presence
+			// of the Closable is needed to prevent duplicate begins
+		})).ifPresent(c -> {
+				try {
+					c.close();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			});
+	}
+
+	@Override
 	protected boolean begin() {
 		if(closed.compareAndSet(BUILDING, STARTED)) {
 			AutoCloseable toClose = connector.apply(this::handleEvent);
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
index e32bf22..7c52b5a 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -60,7 +60,7 @@
 			LogEntrySource logEntrySource = new LogEntrySource(withHistory);
 			PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
 			//creating an unbuffered stream
-			PushStream<LogEntry> logStream = streamBuilder.unbuffered().create();
+			PushStream<LogEntry> logStream = streamBuilder.unbuffered().build();
 			logEntrySource.setLogStream(logStream);
 			// Adding to sources makes the source start listening for new entries
 			logEntrySources.add(logEntrySource);