- Type Parameters:
T- value type
- All Implemented Interfaces:
AutoCloseable,Consumer<T>
Stream instances over a shared
source subject.
Each subject is backed externally by a controlling component that:
- Provides an initial
splitof available values at the point in time a newsubscriptionis created. Acceptsnew values to be distributed to active subscribers.
After the initial split is created, but
before the values available at subscription time have
all been advanced, newly
accepted values are offered to a queue. Values
may be removed from the queue if also advanced from the initial
split. Queued values will be polled and advanced after the last split of the
initial split advances its last value.
After all queued values have been advanced, the subscriber
transitions to a dedicated IuAsynchronousPipe and passes new values
through to IuAsynchronousPipe.accept(Object).
The subject makes no guarantee to the controlling
component when or if a subscriber will transition
from non-blocking access to available values, to potentially blocking access
via IuAsynchronousPipe. It is, however, guaranteed that all values
available from the point in time the subscriber begins
processing the Stream until unsubscribing will be
supplied exactly once regardless of how the value is actually delivered.
New values should be accepted before
appending the external source, to gracefully avoid a potential race condition
between the initial split and an internal appended values queue. For example:
class MyControllingComponent<T> implements Consumer<T> {
private final Queue<T> queue = new ConcurrentLinkedQueue<>();
private final IuAsynchronousSubject<T> subject =
new IuAsynchronousSubject<>(queue::spliterator);
@Override
public void accept(T t) {
subject.accept(t);
queue.offer(t);
}
}
When the source is sequential, subscribers may expect to receive values in the order supplied.
Each subscriber Stream provides all values that may
be retrieved without blocking, then transitions to an
IuAsynchronousPipe managed by the subject to block
until new values become available.
A subscriber's Stream may be closed without affecting the status of the controlling
component, or of any other subscriber streams.
This class is thread-safe and intended for use by high-volume parallel-processing workloads.
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionIuAsynchronousSubject(Supplier<Spliterator<T>> initialSplitSupplier) Creates a new subject. -
Method Summary
Modifier and TypeMethodDescriptionvoidDistributes a value to all potentially blocking subscribers that have completed the transition to anIuAsynchronousPipe.voidclose()Closes the subject.voidReports a fatal error to all subscribers andclosesthe subject.
-
Constructor Details
-
IuAsynchronousSubject
Creates a new subject.- Parameters:
initialSplitSupplier- supplies the initial split backing new subscriber streams.
-
-
Method Details
-
subscribe
Subscribes to aStreamthat supplies all values available without blocking then blocks until new values are available or the subject isclosed.- Returns:
IuAsynchronousSubscription
-
accept
Distributes a value to all potentially blocking subscribers that have completed the transition to anIuAsynchronousPipe.This method does not supply values to subscribers that haven't yet completed the transition. The controlling component is responsible for independently supplying those values to it's
Spliterator-supplying backing source. -
close
public void close()Closes the subject.Once closed:
- Existing subscribers may finish retrieving all values already supplied
- Blocking subscriber
Streams will be terminated gracefully - No new subscribers may be created
- No new values may be supplied
- Specified by:
closein interfaceAutoCloseable
-
error
Reports a fatal error to all subscribers andclosesthe subject.- Parameters:
e- fatal error
-