Bug 521208 - PushStream returned by LogStreamProvider should be buffered
rather than unbuffered

Change-Id: I4ede21315e1d47ee5429ee70eed623bb6adda2cb
Signed-off-by: Anjum Fatima <anjum.eclipse@gmail.com>
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
index 8cf9a5c..8c8a971 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
@@ -33,8 +33,8 @@
 	private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration;
 	private LogStreamProviderFactory logStreamProviderFactory;
 	private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
-	BundleContext context;
-	ReentrantLock eventProducerLock = new ReentrantLock();
+	private BundleContext context;
+	private final ReentrantLock eventProducerLock = new ReentrantLock();
 
 	/*
 	 * (non-Javadoc)
@@ -42,14 +42,11 @@
 	 */
 	@Override
 	public void start(BundleContext bc) throws Exception {
-
 		this.context = bc;
 		logReaderService = new ServiceTracker<>(context, LogReaderService.class, this);
 		logReaderService.open();
-
 		logStreamProviderFactory = new LogStreamProviderFactory(logReaderService);
 		logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null);
-
 	}
 
 	/*
@@ -61,6 +58,7 @@
 		logReaderService.close();
 		logStreamServiceRegistration.unregister();
 		logStreamServiceRegistration = null;
+		logStreamProviderFactory.shutdownExecutor();
 	}
 
 	/*
@@ -159,7 +157,6 @@
 
 	@Override
 	public void logged(LogEntry entry) {
-
 		logStreamProviderFactory.postLogEntry(entry);
 	}
 
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
index f47d0fa..0582071 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
@@ -12,6 +12,9 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.osgi.framework.Bundle;
@@ -28,6 +31,16 @@
 	ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock();
 	ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
 
+	/* 
+	 * ExecutorService is used to provide parallelism of one by making sure only one thread is used for the executor
+	 */
+	private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+		@Override
+		public Thread newThread(Runnable r) {
+			return new Thread(r, "LogStream thread");
+		}
+	});
+
 	public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
 		this.logReaderService = logReaderService;
 	}
@@ -58,7 +71,7 @@
 
 	@Override
 	public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) {
-		LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService);
+		LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService, executor);
 		eventProducerLock.writeLock().lock();
 		try {
 			providers.put(bundle, logStreamProviderImpl);
@@ -85,9 +98,15 @@
 		} finally {
 			eventProducerLock.writeLock().unlock();
 		}
-
 		logStreamProviderImpl.close();
 
 	}
 
+	/*
+	 * Shutdown the executor
+	 */
+	public void shutdownExecutor() {
+		executor.shutdown();
+	}
+
 }
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
index 7c52b5a..94d9372 100644
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
+++ b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
@@ -14,6 +14,8 @@
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.osgi.service.log.LogEntry;
@@ -23,6 +25,7 @@
 import org.osgi.util.pushstream.PushStream;
 import org.osgi.util.pushstream.PushStreamBuilder;
 import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.QueuePolicyOption;
 import org.osgi.util.tracker.ServiceTracker;
 
 public class LogStreamProviderImpl implements LogStreamProvider {
@@ -32,17 +35,22 @@
 	private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);
 
 	private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
+	private final ExecutorService executor;
 
-	public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
+	public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService, ExecutorService executor) {
 		this.logReaderService = logReaderService;
+		this.executor = executor;
 	}
 
 	/* Create a PushStream of {@link LogEntry} objects.
-	 * The returned PushStream is an unbuffered stream with a parallelism of one.
+	 * The returned PushStream is 
+	 * Buffered with a buffer large enough to contain the history, if included.
+	 * Have the QueuePolicyOption.DISCARD_OLDEST queue policy option.
+	 * Use a shared executor.
+	 * Have a parallelism of one.
 	 * (non-Javadoc)
 	 * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
 	 */
-
 	@Override
 	public PushStream<LogEntry> createStream(Options... options) {
 		ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
@@ -59,8 +67,9 @@
 		try {
 			LogEntrySource logEntrySource = new LogEntrySource(withHistory);
 			PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
-			//creating an unbuffered stream
-			PushStream<LogEntry> logStream = streamBuilder.unbuffered().build();
+			//creating a buffered push stream
+			LinkedBlockingQueue<PushEvent<? extends LogEntry>> historyQueue = new LinkedBlockingQueue<>();
+			PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build();
 			logEntrySource.setLogStream(logStream);
 			// Adding to sources makes the source start listening for new entries
 			logEntrySources.add(logEntrySource);