| /* |
| * Copyright (c) OSGi Alliance (2015, 2018). All Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.osgi.util.pushstream; |
| |
| import java.time.Duration; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.Optional; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Executor; |
| import java.util.function.BiFunction; |
| import java.util.function.BinaryOperator; |
| import java.util.function.Consumer; |
| import java.util.function.IntFunction; |
| import java.util.function.IntSupplier; |
| import java.util.function.LongUnaryOperator; |
| import java.util.function.Supplier; |
| import java.util.function.ToLongBiFunction; |
| import java.util.stream.Collector; |
| |
| import org.osgi.annotation.versioning.ProviderType; |
| import org.osgi.util.function.Function; |
| import org.osgi.util.function.Predicate; |
| import org.osgi.util.promise.Promise; |
| import org.osgi.util.promise.TimeoutException; |
| |
| /** |
| * A Push Stream fulfills the same role as the Java 8 stream but it reverses the |
| * control direction. The Java 8 stream is pull based and this is push based. A |
| * Push Stream makes it possible to build a pipeline of transformations using a |
| * builder kind of model. Just like streams, it provides a number of terminating |
| * methods that will actually open the channel and perform the processing until |
| * the channel is closed (The source sends a Close event). The results of the |
| * processing will be send to a Promise, just like any error events. A stream |
| * can be used multiple times. The Push Stream represents a pipeline. Upstream |
| * is in the direction of the source, downstream is in the direction of the |
| * terminating method. Events are sent downstream asynchronously with no |
| * guarantee for ordering or concurrency. Methods are available to provide |
| * serialization of the events and splitting in background threads. |
| * |
| * @param <T> The Payload type |
| */ |
| @ProviderType |
| public interface PushStream<T> extends AutoCloseable { |
| |
| /** |
| * Close this PushStream by sending an event of type |
| * {@link PushEvent.EventType#CLOSE} downstream. Closing a PushStream is a |
| * safe operation that will not throw an Exception. |
| * <p> |
| * Calling {@code close()} on a closed PushStream has no effect. |
| */ |
| @Override |
| void close(); |
| |
| /** |
| * Must be run after the channel is closed. This handler will run after the |
| * downstream methods have processed the close event and before the upstream |
| * methods have closed. |
| * |
| * @param closeHandler Will be called on close |
| * @return This stream |
| */ |
| PushStream<T> onClose(Runnable closeHandler); |
| |
| /** |
| * Must be run after the channel is closed. This handler will run after the |
| * downstream methods have processed the close event and before the upstream |
| * methods have closed. |
| * |
| * @param closeHandler Will be called on close |
| * @return This stream |
| */ |
| PushStream<T> onError(Consumer< ? super Throwable> closeHandler); |
| |
| /** |
| * Only pass events downstream when the predicate tests true. |
| * |
| * @param predicate The predicate that is tested (not null) |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> filter(Predicate< ? super T> predicate); |
| |
| /** |
| * Map a payload value. |
| * |
| * @param mapper The map function |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> map(Function< ? super T, ? extends R> mapper); |
| |
| /** |
| * Asynchronously map the payload values. The mapping function returns a |
| * Promise representing the asynchronous mapping operation. |
| * <p> |
| * The PushStream limits the number of concurrently running mapping |
| * operations, and returns back pressure based on the number of existing |
| * queued operations. |
| * |
| * @param n number of simultaneous promises to use |
| * @param delay Nr of ms/promise that is queued back pressure |
| * @param mapper The mapping function |
| * @return Builder style (can be a new or the same object) |
| * @throws IllegalArgumentException if the number of threads is < 1 or |
| * the delay is < 0 |
| * @throws NullPointerException if the mapper is null |
| */ |
| <R> PushStream<R> asyncMap(int n, int delay, |
| Function< ? super T,Promise< ? extends R>> mapper); |
| |
| /** |
| * Flat map the payload value (turn one event into 0..n events of |
| * potentially another type). |
| * |
| * @param mapper The flat map function |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> flatMap( |
| Function< ? super T, ? extends PushStream< ? extends R>> mapper); |
| |
| /** |
| * Remove any duplicates. Notice that this can be expensive in a large |
| * stream since it must track previous payloads. |
| * |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> distinct(); |
| |
| /** |
| * Sorted the elements, assuming that T extends Comparable. This is of |
| * course expensive for large or infinite streams since it requires |
| * buffering the stream until close. |
| * |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> sorted(); |
| |
| /** |
| * Sorted the elements with the given comparator. This is of course |
| * expensive for large or infinite streams since it requires buffering the |
| * stream until close. |
| * |
| * @param comparator |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> sorted(Comparator< ? super T> comparator); |
| |
| /** |
| * Automatically close the channel after the maxSize number of elements is |
| * received. |
| * |
| * @param maxSize Maximum number of elements has been received |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> limit(long maxSize); |
| |
| /** |
| * Automatically close the channel after the given amount of time has |
| * elapsed. |
| * |
| * @param maxTime The maximum time that the stream should remain open |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> limit(Duration maxTime); |
| |
| /** |
| * Automatically fail the channel if no events are received for the |
| * indicated length of time. If the timeout is reached then a failure event |
| * containing a {@link TimeoutException} will be sent. |
| * |
| * @param idleTime The length of time that the stream should remain open |
| * when no events are being received. |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> timeout(Duration idleTime); |
| |
| /** |
| * Skip a number of events in the channel. |
| * |
| * @param n number of elements to skip |
| * @throws IllegalArgumentException if the number of events to skip is |
| * negative |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> skip(long n); |
| |
| /** |
| * Execute the downstream events in up to n background threads. If more |
| * requests are outstanding apply delay * nr of delayed threads back |
| * pressure. A downstream channel that is closed or throws an exception will |
| * cause all execution to cease and the stream to close |
| * |
| * @param n number of simultaneous background threads to use |
| * @param delay Nr of ms/thread that is queued back pressure |
| * @param e an executor to use for the background threads. |
| * @return Builder style (can be a new or the same object) |
| * @throws IllegalArgumentException if the number of threads is < 1 or |
| * the delay is < 0 |
| * @throws NullPointerException if the Executor is null |
| */ |
| PushStream<T> fork(int n, int delay, Executor e); |
| |
| /** |
| * Buffer the events in a queue using default values for the queue size and |
| * other behaviors. Buffered work will be processed asynchronously in the |
| * rest of the chain. Buffering also blocks the transmission of back |
| * pressure to previous elements in the chain, although back pressure is |
| * honored by the buffer. |
| * <p> |
| * Buffers are useful for "bursty" event sources which produce a number of |
| * events close together, then none for some time. These bursts can |
| * sometimes overwhelm downstream event consumers. Buffering will not, |
| * however, protect downstream components from a source which produces |
| * events faster than they can be consumed. For fast sources |
| * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} |
| * {@link #fork(int, int, Executor)} are better choices. |
| * |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> buffer(); |
| |
| /** |
| * Build a buffer to enqueue events in a queue using custom values for the |
| * queue size and other behaviors. Buffered work will be processed |
| * asynchronously in the rest of the chain. Buffering also blocks the |
| * transmission of back pressure to previous elements in the chain, although |
| * back pressure is honored by the buffer. |
| * <p> |
| * Buffers are useful for "bursty" event sources which produce a number of |
| * events close together, then none for some time. These bursts can |
| * sometimes overwhelm downstream event consumers. Buffering will not, |
| * however, protect downstream components from a source which produces |
| * events faster than they can be consumed. For fast sources |
| * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} |
| * {@link #fork(int, int, Executor)} are better choices. |
| * <p> |
| * Buffers are also useful as "circuit breakers" in the pipeline. If a |
| * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger |
| * the stream to close, preventing an event storm from reaching the client. |
| * |
| * @return A builder which can be used to configure the buffer for this |
| * pipeline stage. |
| */ |
| <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer(); |
| |
| /** |
| * Merge in the events from another source. The resulting channel is not |
| * closed until this channel and the channel from the source are closed. |
| * |
| * @param source The source to merge in. |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> merge(PushEventSource< ? extends T> source); |
| |
| /** |
| * Merge in the events from another PushStream. The resulting channel is not |
| * closed until this channel and the channel from the source are closed. |
| * |
| * @param source The source to merge in. |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> merge(PushStream< ? extends T> source); |
| |
| /** |
| * Split the events to different streams based on a predicate. If the |
| * predicate is true, the event is dispatched to that channel on the same |
| * position. All predicates are tested for every event. |
| * <p> |
| * This method differs from other methods of PushStream in three significant |
| * ways: |
| * <ul> |
| * <li>The return value contains multiple streams.</li> |
| * <li>This stream will only close when all of these child streams have |
| * closed.</li> |
| * <li>Event delivery is made to all open children that accept the event. |
| * </li> |
| * </ul> |
| * |
| * @param predicates the predicates to test |
| * @return streams that map to the predicates |
| */ |
| @SuppressWarnings("unchecked") |
| PushStream<T>[] split(Predicate< ? super T>... predicates); |
| |
| /** |
| * Ensure that any events are delivered sequentially. That is, no |
| * overlapping calls downstream. This can be used to turn a forked stream |
| * (where for example a heavy conversion is done in multiple threads) back |
| * into a sequential stream so a reduce is simple to do. |
| * |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> sequential(); |
| |
| /** |
| * Coalesces a number of events into a new type of event. The input events |
| * are forwarded to a accumulator function. This function returns an |
| * Optional. If the optional is present, it's value is send downstream, |
| * otherwise it is ignored. |
| * |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f); |
| |
| /** |
| * Coalesces a number of events into a new type of event. A fixed number of |
| * input events are forwarded to a accumulator function. This function |
| * returns new event data to be forwarded on. |
| * |
| * @param count |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f); |
| |
| /** |
| * Coalesces a number of events into a new type of event. A variable number |
| * of input events are forwarded to a accumulator function. The number of |
| * events to be forwarded is determined by calling the count function. The |
| * accumulator function then returns new event data to be forwarded on. |
| * |
| * @param count |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| public <R> PushStream<R> coalesce(IntSupplier count, |
| Function<Collection<T>,R> f); |
| |
| /** |
| * Buffers a number of events over a fixed time interval and then forwards |
| * the events to an accumulator function. This function returns new event |
| * data to be forwarded on. Note that: |
| * <ul> |
| * <li>The collection forwarded to the accumulator function will be empty if |
| * no events arrived during the time interval.</li> |
| * <li>The accumulator function will be run and the forwarded event |
| * delivered as a different task, (and therefore potentially on a different |
| * thread) from the one that delivered the event to this {@link PushStream}. |
| * </li> |
| * <li>Due to the buffering and asynchronous delivery required, this method |
| * prevents the propagation of back-pressure to earlier stages</li> |
| * </ul> |
| * |
| * @param d |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f); |
| |
| /** |
| * Buffers a number of events over a fixed time interval and then forwards |
| * the events to an accumulator function. This function returns new event |
| * data to be forwarded on. Note that: |
| * <ul> |
| * <li>The collection forwarded to the accumulator function will be empty if |
| * no events arrived during the time interval.</li> |
| * <li>The accumulator function will be run and the forwarded event |
| * delivered by a task given to the supplied executor.</li> |
| * <li>Due to the buffering and asynchronous delivery required, this method |
| * prevents the propagation of back-pressure to earlier stages</li> |
| * </ul> |
| * |
| * @param d |
| * @param executor |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> window(Duration d, Executor executor, |
| Function<Collection<T>,R> f); |
| |
| /** |
| * Buffers a number of events over a variable time interval and then |
| * forwards the events to an accumulator function. The length of time over |
| * which events are buffered is determined by the time function. A maximum |
| * number of events can also be requested, if this number of events is |
| * reached then the accumulator will be called early. The accumulator |
| * function returns new event data to be forwarded on. It is also given the |
| * length of time for which the buffer accumulated data. This may be less |
| * than the requested interval if the buffer reached the maximum number of |
| * requested events early. Note that: |
| * <ul> |
| * <li>The collection forwarded to the accumulator function will be empty if |
| * no events arrived during the time interval.</li> |
| * <li>The accumulator function will be run and the forwarded event |
| * delivered as a different task, (and therefore potentially on a different |
| * thread) from the one that delivered the event to this {@link PushStream}. |
| * </li> |
| * <li>Due to the buffering and asynchronous delivery required, this method |
| * prevents the propagation of back-pressure to earlier stages</li> |
| * <li>If the window finishes by hitting the maximum number of events then |
| * the remaining time in the window will be applied as back-pressure to the |
| * previous stage, attempting to slow the producer to the expected windowing |
| * threshold.</li> |
| * </ul> |
| * |
| * @param timeSupplier |
| * @param maxEvents |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> window(Supplier<Duration> timeSupplier, |
| IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f); |
| |
| /** |
| * Buffers a number of events over a variable time interval and then |
| * forwards the events to an accumulator function. The length of time over |
| * which events are buffered is determined by the time function. A maximum |
| * number of events can also be requested, if this number of events is |
| * reached then the accumulator will be called early. The accumulator |
| * function returns new event data to be forwarded on. It is also given the |
| * length of time for which the buffer accumulated data. This may be less |
| * than the requested interval if the buffer reached the maximum number of |
| * requested events early. Note that: |
| * <ul> |
| * <li>The collection forwarded to the accumulator function will be empty if |
| * no events arrived during the time interval.</li> |
| * <li>The accumulator function will be run and the forwarded event |
| * delivered as a different task, (and therefore potentially on a different |
| * thread) from the one that delivered the event to this {@link PushStream}. |
| * </li> |
| * <li>If the window finishes by hitting the maximum number of events then |
| * the remaining time in the window will be applied as back-pressure to the |
| * previous stage, attempting to slow the producer to the expected windowing |
| * threshold.</li> |
| * </ul> |
| * |
| * @param timeSupplier |
| * @param maxEvents |
| * @param executor |
| * @param f |
| * @return Builder style (can be a new or the same object) |
| */ |
| <R> PushStream<R> window(Supplier<Duration> timeSupplier, |
| IntSupplier maxEvents, Executor executor, |
| BiFunction<Long,Collection<T>,R> f); |
| |
| /** |
| * Changes the back-pressure propagated by this pipeline stage. |
| * <p> |
| * The supplied function receives the back pressure returned by the next |
| * pipeline stage and returns the back pressure that should be returned by |
| * this stage. This function will not be called if the previous pipeline |
| * stage returns negative back pressure. |
| * |
| * @param adjustment |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> adjustBackPressure(LongUnaryOperator adjustment); |
| |
| /** |
| * Changes the back-pressure propagated by this pipeline stage. |
| * <p> |
| * The supplied function receives the data object passed to the next |
| * pipeline stage and the back pressure that was returned by that stage when |
| * accepting it. The function returns the back pressure that should be |
| * returned by this stage. This function will not be called if the previous |
| * pipeline stage returns negative back pressure. |
| * |
| * @param adjustment |
| * @return Builder style (can be a new or the same object) |
| */ |
| PushStream<T> adjustBackPressure(ToLongBiFunction<T,Long> adjustment); |
| |
| /** |
| * Execute the action for each event received until the channel is closed. |
| * This is a terminating method, the returned promise is resolved when the |
| * channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param action The action to perform |
| * @return A promise that is resolved when the channel closes. |
| */ |
| Promise<Void> forEach(Consumer< ? super T> action); |
| |
| /** |
| * Collect the payloads in an Object array after the channel is closed. This |
| * is a terminating method, the returned promise is resolved when the |
| * channel is closed. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @return A promise that is resolved with all the payloads received over |
| * the channel |
| */ |
| Promise<Object[]> toArray(); |
| |
| /** |
| * Collect the payloads in an Object array after the channel is closed. This |
| * is a terminating method, the returned promise is resolved when the |
| * channel is closed. The type of the array is handled by the caller using a |
| * generator function that gets the length of the desired array. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param generator |
| * @return A promise that is resolved with all the payloads received over |
| * the channel |
| */ |
| <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator); |
| |
| /** |
| * Standard reduce, see Stream. The returned promise will be resolved when |
| * the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param identity The identity/begin value |
| * @param accumulator The accumulator |
| * @return A |
| */ |
| Promise<T> reduce(T identity, BinaryOperator<T> accumulator); |
| |
| /** |
| * Standard reduce without identity, so the return is an Optional. The |
| * returned promise will be resolved when the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param accumulator The accumulator |
| * @return an Optional |
| */ |
| Promise<Optional<T>> reduce(BinaryOperator<T> accumulator); |
| |
| /** |
| * Standard reduce with identity, accumulator and combiner. The returned |
| * promise will be resolved when the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param identity |
| * @param accumulator |
| * @param combiner combines two U's into one U (for example, combine two |
| * lists) |
| * @return The promise |
| */ |
| <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator, |
| BinaryOperator<U> combiner); |
| |
| /** |
| * See Stream. Will resolve once the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param collector |
| * @return A Promise representing the collected results |
| */ |
| <R, A> Promise<R> collect(Collector< ? super T,A,R> collector); |
| |
| /** |
| * See Stream. Will resolve onces the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param comparator |
| * @return A Promise representing the minimum value, or null if no values |
| * are seen before the end of the stream |
| */ |
| Promise<Optional<T>> min(Comparator< ? super T> comparator); |
| |
| /** |
| * See Stream. Will resolve onces the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param comparator |
| * @return A Promise representing the maximum value, or null if no values |
| * are seen before the end of the stream |
| */ |
| Promise<Optional<T>> max(Comparator< ? super T> comparator); |
| |
| /** |
| * See Stream. Will resolve onces the channel closes. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @return A Promise representing the number of values in the stream |
| */ |
| Promise<Long> count(); |
| |
| /** |
| * Close the channel and resolve the promise with true when the predicate |
| * matches a payload. If the channel is closed before the predicate matches, |
| * the promise is resolved with false. |
| * <p> |
| * This is a <strong>short circuiting terminal operation</strong> |
| * |
| * @param predicate |
| * @return A Promise that will resolve when an event matches the predicate, |
| * or the end of the stream is reached |
| */ |
| Promise<Boolean> anyMatch(Predicate< ? super T> predicate); |
| |
| /** |
| * Closes the channel and resolve the promise with false when the predicate |
| * does not matches a pay load. If the channel is closed before, the promise |
| * is resolved with true. |
| * <p> |
| * This is a <strong>short circuiting terminal operation</strong> |
| * |
| * @param predicate |
| * @return A Promise that will resolve when an event fails to match the |
| * predicate, or the end of the stream is reached |
| */ |
| Promise<Boolean> allMatch(Predicate< ? super T> predicate); |
| |
| /** |
| * Closes the channel and resolve the promise with false when the predicate |
| * matches any pay load. If the channel is closed before, the promise is |
| * resolved with true. |
| * <p> |
| * This is a <strong>short circuiting terminal operation</strong> |
| * |
| * @param predicate |
| * @return A Promise that will resolve when an event matches the predicate, |
| * or the end of the stream is reached |
| */ |
| Promise<Boolean> noneMatch(Predicate< ? super T> predicate); |
| |
| /** |
| * Close the channel and resolve the promise with the first element. If the |
| * channel is closed before, the Optional will have no value. |
| * |
| * @return a promise |
| */ |
| Promise<Optional<T>> findFirst(); |
| |
| /** |
| * Close the channel and resolve the promise with the first element. If the |
| * channel is closed before, the Optional will have no value. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @return a promise |
| */ |
| Promise<Optional<T>> findAny(); |
| |
| /** |
| * Pass on each event to another consumer until the stream is closed. |
| * <p> |
| * This is a <strong>terminal operation</strong> |
| * |
| * @param action |
| * @return a promise |
| */ |
| Promise<Long> forEachEvent(PushEventConsumer< ? super T> action); |
| |
| } |