Module iu.util
Package edu.iu

Class IuAsynchronousSubject<T>

java.lang.Object
edu.iu.IuAsynchronousSubject<T>
Type Parameters:
T - value type
All Implemented Interfaces:
AutoCloseable, Consumer<T>

public class IuAsynchronousSubject<T> extends Object implements Consumer<T>, AutoCloseable
Provides subscriber Stream instances over a shared source subject.

Each subject is backed externally by a controlling component that:

  • Provides an initial split of available values at the point in time a new subscription is created.
  • Accepts new values to be distributed to active subscribers.

After the initial split is created, but before the values available at subscription time have all been advanced, newly accepted values are offered to a queue. Values may be removed from the queue if also advanced from the initial split. Queued values will be polled and advanced after the last split of the initial split advances its last value.

After all queued values have been advanced, the subscriber transitions to a dedicated IuAsynchronousPipe and passes new values through to IuAsynchronousPipe.accept(Object).

The subject makes no guarantee to the controlling component when or if a subscriber will transition from non-blocking access to available values, to potentially blocking access via IuAsynchronousPipe. It is, however, guaranteed that all values available from the point in time the subscriber begins processing the Stream until unsubscribing will be supplied exactly once regardless of how the value is actually delivered.

UML Sequence Diagram

New values should be accepted before appending the external source, to gracefully avoid a potential race condition between the initial split and an internal appended values queue. For example:

 class MyControllingComponent<T> implements Consumer<T> {
        private final Queue<T> queue = new ConcurrentLinkedQueue<>();
        private final IuAsynchronousSubject<T> subject =
                new IuAsynchronousSubject<>(queue::spliterator);
 
        @Override
        public void accept(T t) {
                subject.accept(t);
                queue.offer(t);
        }
 }
 

When the source is sequential, subscribers may expect to receive values in the order supplied.

Each subscriber Stream provides all values that may be retrieved without blocking, then transitions to an IuAsynchronousPipe managed by the subject to block until new values become available.

A subscriber's Stream may be closed without affecting the status of the controlling component, or of any other subscriber streams.

This class is thread-safe and intended for use by high-volume parallel-processing workloads.

See Also:
  • Constructor Details

    • IuAsynchronousSubject

      public IuAsynchronousSubject(Supplier<Spliterator<T>> initialSplitSupplier)
      Creates a new subject.
      Parameters:
      initialSplitSupplier - supplies the initial split backing new subscriber streams.
  • Method Details

    • subscribe

      public IuAsynchronousSubscription<T> subscribe()
      Subscribes to a Stream that supplies all values available without blocking then blocks until new values are available or the subject is closed.
      Returns:
      IuAsynchronousSubscription
    • accept

      public void accept(T value)
      Distributes a value to all potentially blocking subscribers that have completed the transition to an IuAsynchronousPipe.

      This method does not supply values to subscribers that haven't yet completed the transition. The controlling component is responsible for independently supplying those values to it's Spliterator-supplying backing source.

      Specified by:
      accept in interface Consumer<T>
      Parameters:
      value - value to supply to all subscribers
    • close

      public void close()
      Closes the subject.

      Once closed:

      • Existing subscribers may finish retrieving all values already supplied
      • Blocking subscriber Streams will be terminated gracefully
      • No new subscribers may be created
      • No new values may be supplied
      Specified by:
      close in interface AutoCloseable
    • error

      public void error(Throwable e)
      Reports a fatal error to all subscribers and closes the subject.
      Parameters:
      e - fatal error