blob: ecd8bf4c4e990d595072c709f7217bf9f0aea874 [file] [log] [blame]
/*
* Copyright (c) OSGi Alliance (2015, 2017). All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.osgi.util.pushstream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
import static org.osgi.util.pushstream.PushEvent.*;
import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.osgi.util.promise.PromiseFactory;
/**
* A factory for {@link PushStream} instances, and utility methods for handling
* {@link PushEventSource}s and {@link PushEventConsumer}s
*/
public final class PushStreamProvider {
private final Lock lock = new ReentrantLock(true);
private int schedulerReferences;
private ScheduledExecutorService sharedScheduler;
private ScheduledExecutorService acquireScheduler() {
try {
lock.lockInterruptibly();
try {
schedulerReferences += 1;
if (schedulerReferences == 1) {
sharedScheduler = Executors
.newSingleThreadScheduledExecutor();
}
return sharedScheduler;
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
throw new IllegalStateException("Unable to acquire the Scheduler",
e);
}
}
private void releaseScheduler() {
try {
lock.lockInterruptibly();
try {
schedulerReferences -= 1;
if (schedulerReferences == 0) {
sharedScheduler.shutdown();
sharedScheduler = null;
}
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Create a stream with the default configured buffer, executor size, queue,
* queue policy and pushback policy. This is equivalent to calling
*
* <code>
* buildStream(source).create();
* </code>
*
* <p>
* This stream will be buffered from the event producer, and will honour
* back pressure even if the source does not.
*
* <p>
* Buffered streams are useful for "bursty" event sources which produce a
* number of events close together, then none for some time. These bursts
* can sometimes overwhelm downstream processors. Buffering will not,
* however, protect downstream components from a source which produces
* events faster (on average) than they can be consumed.
*
* <p>
* Event delivery will not begin until a terminal operation is reached on
* the chain of AsyncStreams. Once a terminal operation is reached the
* stream will be connected to the event source.
*
* @param eventSource
* @return A {@link PushStream} with a default initial buffer
*/
public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
return createStream(eventSource, 1, null, null,
new ArrayBlockingQueue<>(32),
FAIL.getPolicy(), LINEAR.getPolicy(1000));
}
/**
* Builds a push stream with custom configuration.
*
* <p>
*
* The resulting {@link PushStream} may be buffered or unbuffered depending
* on how it is configured.
*
* @param eventSource The source of the events
*
* @return A {@link PushStreamBuilder} for the stream
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
PushEventSource<T> eventSource) {
return new PushStreamBuilderImpl<T,U>(this, null, null, eventSource);
}
@SuppressWarnings({
"rawtypes", "unchecked"
})
<T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
PushEventSource<T> eventSource, int parallelism, Executor executor,
ScheduledExecutorService scheduler, U queue,
QueuePolicy<T,U> queuePolicy,
PushbackPolicy<T,U> pushbackPolicy) {
if (eventSource == null) {
throw new NullPointerException("There is no source of events");
}
if (parallelism < 0) {
throw new IllegalArgumentException(
"The supplied parallelism cannot be less than zero. It was "
+ parallelism);
} else if (parallelism == 0) {
parallelism = 1;
}
boolean closeExecutorOnClose;
Executor workerToUse;
if (executor == null) {
workerToUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
boolean releaseSchedulerOnClose;
ScheduledExecutorService timerToUse;
if (scheduler == null) {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
if (queue == null) {
queue = (U) new ArrayBlockingQueue(32);
}
if (queuePolicy == null) {
queuePolicy = FAIL.getPolicy();
}
if (pushbackPolicy == null) {
pushbackPolicy = LINEAR.getPolicy(1000);
}
PushStream<T> stream = new BufferedPushStreamImpl<>(this,
new PromiseFactory(workerToUse, timerToUse), queue,
parallelism, queuePolicy,
pushbackPolicy, aec -> {
try {
return eventSource.open(aec);
} catch (Exception e) {
throw new RuntimeException(
"Unable to connect to event source", e);
}
});
return cleanupThreads(closeExecutorOnClose, workerToUse,
releaseSchedulerOnClose, stream);
}
private <T> PushStream<T> cleanupThreads(boolean closeExecutorOnClose,
Executor workerToUse, boolean releaseSchedulerOnClose,
PushStream<T> stream) {
if (closeExecutorOnClose || releaseSchedulerOnClose) {
stream = stream.onClose(() -> {
if (closeExecutorOnClose) {
((ExecutorService) workerToUse).shutdown();
}
if (releaseSchedulerOnClose) {
releaseScheduler();
}
}).map(x -> x);
}
return stream;
}
<T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
Executor executor, ScheduledExecutorService scheduler) {
boolean closeExecutorOnClose;
Executor workerToUse;
if (executor == null) {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
boolean releaseSchedulerOnClose;
ScheduledExecutorService timerToUse;
if (scheduler == null) {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<>(this,
new PromiseFactory(workerToUse, timerToUse),
aec -> {
try {
return eventSource.open(aec);
} catch (Exception e) {
throw new RuntimeException(
"Unable to connect to event source", e);
}
});
return cleanupThreads(closeExecutorOnClose, workerToUse,
releaseSchedulerOnClose, stream);
}
/**
* Convert an {@link PushStream} into an {@link PushEventSource}. The first
* call to {@link PushEventSource#open(PushEventConsumer)} will begin event
* processing.
*
* The {@link PushEventSource} will remain active until the backing stream
* is closed, and permits multiple consumers to
* {@link PushEventSource#open(PushEventConsumer)} it.
*
* This is equivalent to: <code>
* buildEventSourceFromStream(stream).create();
* </code>
*
* @param stream
* @return a {@link PushEventSource} backed by the {@link PushStream}
*/
public <T> PushEventSource<T> createEventSourceFromStream(
PushStream<T> stream) {
return buildEventSourceFromStream(stream).build();
}
/**
* Convert an {@link PushStream} into an {@link PushEventSource}. The first
* call to {@link PushEventSource#open(PushEventConsumer)} will begin event
* processing.
*
* <p>
* The {@link PushEventSource} will remain active until the backing stream
* is closed, and permits multiple consumers to
* {@link PushEventSource#open(PushEventConsumer)} it. Note that this means
* the caller of this method is responsible for closing the supplied
* stream if it is not finite in length.
*
* <p>Late joining
* consumers will not receive historical events, but will immediately
* receive the terminal event which closed the stream if the stream is
* already closed.
*
* @param stream
*
* @return a {@link PushEventSource} backed by the {@link PushStream}
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
PushStream<T> stream) {
BufferBuilder<PushStream<T>,T,U> builder = stream.buildBuffer();
return new BufferBuilder<PushEventSource<T>,T,U>() {
@Override
public BufferBuilder<PushEventSource<T>,T,U> withBuffer(U queue) {
builder.withBuffer(queue);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy(
QueuePolicy<T,U> queuePolicy) {
builder.withQueuePolicy(queuePolicy);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withQueuePolicy(
QueuePolicyOption queuePolicyOption) {
builder.withQueuePolicy(queuePolicyOption);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy(
PushbackPolicy<T,U> pushbackPolicy) {
builder.withPushbackPolicy(pushbackPolicy);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withPushbackPolicy(
PushbackPolicyOption pushbackPolicyOption, long time) {
builder.withPushbackPolicy(pushbackPolicyOption, time);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withParallelism(
int parallelism) {
builder.withParallelism(parallelism);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withExecutor(
Executor executor) {
builder.withExecutor(executor);
return this;
}
@Override
public BufferBuilder<PushEventSource<T>,T,U> withScheduler(
ScheduledExecutorService scheduler) {
builder.withScheduler(scheduler);
return this;
}
@Override
public PushEventSource<T> build() {
AtomicBoolean connect = new AtomicBoolean();
AtomicReference<PushEvent<T>> terminalEvent = new AtomicReference<>();
CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers = new CopyOnWriteArrayList<>();
return consumer -> {
consumers.add(consumer);
PushEvent<T> terminal = terminalEvent.get();
if (terminal != null) {
if (consumers.remove(consumer)) {
// The stream is already done and we missed it
consumer.accept(terminal);
}
return () -> {
//Nothing to do, we have already sent the terminal event
};
}
if(!connect.getAndSet(true)) {
// connect
builder.build()
.forEachEvent(new MultiplexingConsumer<T>(
terminalEvent, consumers));
}
return () -> {
if (consumers.remove(consumer)) {
try {
consumer.accept(PushEvent.close());
} catch (Exception ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
}
};
};
}
};
}
private static class MultiplexingConsumer<T> implements PushEventConsumer<T> {
private final AtomicReference<PushEvent<T>> terminalEventStore;
private final CopyOnWriteArrayList<PushEventConsumer<? super T>> consumers;
public MultiplexingConsumer(
AtomicReference<PushEvent<T>> terminalEventStore,
CopyOnWriteArrayList<PushEventConsumer< ? super T>> consumers) {
super();
this.terminalEventStore = terminalEventStore;
this.consumers = consumers;
}
@Override
public long accept(PushEvent< ? extends T> event) throws Exception {
boolean isTerminal = event.isTerminal();
if(isTerminal) {
if(!terminalEventStore.compareAndSet(null, event.nodata())) {
// We got a duplicate terminal, silently ignore it
return -1;
}
for (PushEventConsumer< ? super T> pushEventConsumer : consumers) {
if(consumers.remove(pushEventConsumer)) {
try {
pushEventConsumer.accept(event);
} catch (Exception ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
}
}
return -1;
} else {
long maxBP = 0;
for (PushEventConsumer< ? super T> pushEventConsumer : consumers) {
try {
long tmpBP = pushEventConsumer.accept(event);
if(tmpBP < 0 && consumers.remove(pushEventConsumer)) {
try {
pushEventConsumer.accept(PushEvent.close());
} catch (Exception ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
} else if (tmpBP > maxBP) {
maxBP = tmpBP;
}
} catch (Exception ex) {
if(consumers.remove(pushEventConsumer)) {
try {
pushEventConsumer.accept(PushEvent.error(ex));
} catch (Exception ex2) {
// TODO Auto-generated catch block
ex2.printStackTrace();
}
}
}
}
return maxBP;
}
}
}
/**
* Create a {@link SimplePushEventSource} with the supplied type and default
* buffering behaviours. The SimplePushEventSource will respond to back
* pressure requests from the consumers connected to it.
*
* This is equivalent to: <code>
* buildSimpleEventSource(type).create();
* </code>
*
* @param type
* @return a {@link SimplePushEventSource}
*/
public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) {
return createSimplePushEventSource(1, null,
new ArrayBlockingQueue<>(32),
FAIL.getPolicy(), () -> { /* Nothing else to do */ });
}
/**
*
* Build a {@link SimplePushEventSource} with the supplied type and custom
* buffering behaviours. The SimplePushEventSource will respond to back
* pressure requests from the consumers connected to it.
*
* @param type
*
* @return a {@link SimplePushEventSource}
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
Class<T> type) {
return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
@Override
public SimplePushEventSource<T> build() {
return createSimplePushEventSource(concurrency, worker, buffer,
bufferingPolicy, () -> { /* Nothing else to do */ });
}
};
}
@SuppressWarnings({
"unchecked", "rawtypes"
})
<T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(
int parallelism, Executor executor, U queue,
QueuePolicy<T,U> queuePolicy, Runnable onClose) {
if (parallelism < 0) {
throw new IllegalArgumentException(
"The supplied parallelism cannot be less than zero. It was "
+ parallelism);
} else if (parallelism == 0) {
parallelism = 1;
}
boolean closeExecutorOnClose;
Executor toUse;
if (executor == null) {
toUse = Executors.newFixedThreadPool(parallelism);
closeExecutorOnClose = true;
} else {
toUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
if (queue == null) {
queue = (U) new ArrayBlockingQueue(32);
}
if (queuePolicy == null) {
queuePolicy = FAIL.getPolicy();
}
SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
new PromiseFactory(toUse, acquireScheduler()), queuePolicy,
queue, parallelism,
() -> {
try {
onClose.run();
} catch (Exception e) {
// TODO log this?
}
if (closeExecutorOnClose) {
((ExecutorService) toUse).shutdown();
}
releaseScheduler();
});
return spes;
}
/**
* Create a buffered {@link PushEventConsumer} with the default configured
* buffer, executor size, queue, queue policy and pushback policy. This is
* equivalent to calling
*
* <code>
* buildBufferedConsumer(delegate).create();
* </code>
*
* <p>
* The returned consumer will be buffered from the event source, and will
* honour back pressure requests from its delegate even if the event source
* does not.
*
* <p>
* Buffered consumers are useful for "bursty" event sources which produce a
* number of events close together, then none for some time. These bursts
* can sometimes overwhelm the consumer. Buffering will not, however,
* protect downstream components from a source which produces events faster
* than they can be consumed.
*
* @param delegate
* @return a {@link PushEventConsumer} with a buffer directly before it
*/
public <T> PushEventConsumer<T> createBufferedConsumer(
PushEventConsumer<T> delegate) {
return buildBufferedConsumer(delegate).build();
}
/**
* Build a buffered {@link PushEventConsumer} with custom configuration.
* <p>
* The returned consumer will be buffered from the event source, and will
* honour back pressure requests from its delegate even if the event source
* does not.
* <p>
* Buffered consumers are useful for "bursty" event sources which produce a
* number of events close together, then none for some time. These bursts
* can sometimes overwhelm the consumer. Buffering will not, however,
* protect downstream components from a source which produces events faster
* than they can be consumed.
* <p>
* Buffers are also useful as "circuit breakers". If a
* {@link QueuePolicyOption#FAIL} is used then a full buffer will request
* that the stream close, preventing an event storm from reaching the
* client.
* <p>
* Note that this buffered consumer will close when it receives a terminal
* event, or if the delegate returns negative backpressure. No further
* events will be propagated after this time.
*
* @param delegate
* @return a {@link PushEventConsumer} with a buffer directly before it
*/
public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
PushEventConsumer<T> delegate) {
return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
@Override
public PushEventConsumer<T> build() {
PushEventPipe<T> pipe = new PushEventPipe<>();
createStream(pipe, concurrency, worker, timer, buffer,
bufferingPolicy, backPressure)
.forEachEvent(delegate);
return pipe;
}
};
}
static final class PushEventPipe<T>
implements PushEventConsumer<T>, PushEventSource<T> {
volatile PushEventConsumer< ? super T> delegate;
@Override
public AutoCloseable open(PushEventConsumer< ? super T> pec)
throws Exception {
return () -> { /* Nothing else to do */ };
}
@Override
public long accept(PushEvent< ? extends T> event) throws Exception {
return delegate.accept(event);
}
}
/**
* Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
* data from the stream will be pushed into the PushStream synchronously as
* it is opened. This may make terminal operations blocking unless a buffer
* has been added to the {@link PushStream}. Care should be taken with
* infinite {@link Stream}s to avoid blocking indefinitely.
*
* @param items The items to push into the PushStream
* @return A PushStream containing the items from the Java Stream
*/
public <T> PushStream<T> streamOf(Stream<T> items) {
PushEventSource<T> pes = aec -> {
AtomicBoolean closed = new AtomicBoolean(false);
items.mapToLong(i -> {
try {
long returnValue = closed.get() ? -1 : aec.accept(data(i));
if (returnValue < 0) {
aec.accept(PushEvent.<T> close());
}
return returnValue;
} catch (Exception e) {
try {
aec.accept(PushEvent.<T> error(e));
} catch (Exception e2) {/* No further events needed */}
return -1;
}
}).filter(i -> i < 0).findFirst().orElseGet(() -> {
try {
return aec.accept(PushEvent.<T> close());
} catch (Exception e) {
return -1;
}
});
return () -> closed.set(true);
};
return this.<T> createUnbufferedStream(pes, null, null);
}
/**
* Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
* data from the stream will be pushed into the PushStream asynchronously
* using the supplied Executor.
*
* @param executor The worker to use to push items from the Stream into the
* PushStream
* @param scheduler The scheduler to use to trigger timed events in the
* PushStream
* @param items The items to push into the PushStream
* @return A PushStream containing the items from the Java Stream
*/
public <T> PushStream<T> streamOf(Executor executor,
ScheduledExecutorService scheduler, Stream<T> items) {
boolean closeExecutorOnClose;
Executor workerToUse;
if (executor == null) {
workerToUse = Executors.newFixedThreadPool(2);
closeExecutorOnClose = true;
} else {
workerToUse = Objects.requireNonNull(executor);
closeExecutorOnClose = false;
}
boolean releaseSchedulerOnClose;
ScheduledExecutorService timerToUse;
if (scheduler == null) {
timerToUse = acquireScheduler();
releaseSchedulerOnClose = true;
} else {
timerToUse = Objects.requireNonNull(scheduler);
releaseSchedulerOnClose = false;
}
PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
this, new PromiseFactory(workerToUse, timerToUse), aec -> {
return () -> { /* No action to take */ };
}) {
@Override
protected boolean begin() {
if (super.begin()) {
Iterator<T> it = items.iterator();
promiseFactory.executor().execute(() -> pushData(it));
return true;
}
return false;
}
private void pushData(Iterator<T> it) {
while (it.hasNext()) {
try {
long returnValue = closed.get() == CLOSED ? -1
: handleEvent(data(it.next()));
if (returnValue != 0) {
if (returnValue < 0) {
close();
return;
} else {
promiseFactory.scheduledExecutor()
.schedule(
() -> promiseFactory.executor()
.execute(() -> pushData(it)),
returnValue, MILLISECONDS);
return;
}
}
} catch (Exception e) {
close(error(e));
}
}
close();
}
};
return cleanupThreads(closeExecutorOnClose, workerToUse,
releaseSchedulerOnClose, stream);
}
}