- Type Parameters:
T
- value type
- All Implemented Interfaces:
AutoCloseable
,Consumer<T>
Stream
instances over a shared
source subject.
Each subject is backed externally by a controlling component that:
- Provides an initial
split
of available values at the point in time a newsubscription
is created. Accepts
new values to be distributed to active subscribers.
After the initial split
is created, but
before the values available at subscription
time have
all been advanced
, newly
accepted
values are offered to a queue. Values
may be removed from the queue if also advanced from the initial
split. Queued values will be polled and advanced after the last split of the
initial split advances its last value.
After all queued values have been advanced, the subscriber
transitions to a dedicated IuAsynchronousPipe
and passes new values
through to IuAsynchronousPipe.accept(Object)
.
The subject makes no guarantee to the controlling
component when or if a subscriber will transition
from non-blocking access to available values, to potentially blocking access
via IuAsynchronousPipe
. It is, however, guaranteed that all values
available from the point in time the subscriber begins
processing the Stream
until unsubscribing will be
supplied exactly once regardless of how the value is actually delivered.
New values should be accepted
before
appending the external source, to gracefully avoid a potential race condition
between the initial split and an internal appended values queue. For example:
class MyControllingComponent<T> implements Consumer<T> { private final Queue<T> queue = new ConcurrentLinkedQueue<>(); private final IuAsynchronousSubject<T> subject = new IuAsynchronousSubject<>(queue::spliterator); @Override public void accept(T t) { subject.accept(t); queue.offer(t); } }
When the source is sequential, subscribers may expect to receive values in the order supplied.
Each subscriber Stream
provides all values that may
be retrieved without blocking, then transitions to an
IuAsynchronousPipe
managed by the subject to block
until new values become available.
A subscriber's Stream
may be closed
without affecting the status of the controlling
component, or of any other subscriber streams
.
This class is thread-safe and intended for use by high-volume parallel-processing workloads.
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionIuAsynchronousSubject
(Supplier<Spliterator<T>> initialSplitSupplier) Creates a new subject. -
Method Summary
Modifier and TypeMethodDescriptionvoid
Distributes a value to all potentially blocking subscribers that have completed the transition to anIuAsynchronousPipe
.void
close()
Closes the subject.void
Reports a fatal error to all subscribers andcloses
the subject.
-
Constructor Details
-
IuAsynchronousSubject
Creates a new subject.- Parameters:
initialSplitSupplier
- supplies the initial split backing new subscriber streams.
-
-
Method Details
-
subscribe
Subscribes to aStream
that supplies all values available without blocking then blocks until new values are available or the subject isclosed
.- Returns:
IuAsynchronousSubscription
-
accept
Distributes a value to all potentially blocking subscribers that have completed the transition to anIuAsynchronousPipe
.This method does not supply values to subscribers that haven't yet completed the transition. The controlling component is responsible for independently supplying those values to it's
Spliterator
-supplying backing source. -
close
public void close()Closes the subject.Once closed:
- Existing subscribers may finish retrieving all values already supplied
- Blocking subscriber
Stream
s will be terminated gracefully - No new subscribers may be created
- No new values may be supplied
- Specified by:
close
in interfaceAutoCloseable
-
error
Reports a fatal error to all subscribers andcloses
the subject.- Parameters:
e
- fatal error
-