Module iu.util
Package edu.iu

Class IuAsynchronousPipe<T>

java.lang.Object
edu.iu.IuAsynchronousPipe<T>
Type Parameters:
T - value type
All Implemented Interfaces:
AutoCloseable, Consumer<T>

public class IuAsynchronousPipe<T> extends Object implements Consumer<T>, AutoCloseable
Accepts values for asynchronous retrieval via Stream.

UML Class Diagram

A common use case is passing large sets of homogeneous values between heterogeneous components.

UML Sequence Diagram

Within an asynchronous process: the controlling component creates a pipe and passes to the receiving component. The receiving component detaches a Stream through which values are retrieved. Once connected in this fashion, the controlling component can asynchronously supply values to the receiving component.

Any time after creating the pipe,

  • the controlling component may:
    • asynchronously retrieve values from an external source and supply them to the pipe.
    • pause until
      • some or all of the values supplied to the pipe have been received.
      • the stream is closed by the receiving component.
  • The receiving component may:

Although IuAsynchronousPipe may be used as a simple queue on a single thread, it is intended to be used concurrently on multiple threads in order to distribute load related to loading and consuming large sets of homogeneous values.

See Also:
  • Constructor Summary

    Constructors
    Constructor
    Description
    Default constructor.
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    accept(T value)
    Used by the controlling component to supply values to the receiving component via the Consumer interface.
    void
    Used by the controlling component to close the pipe.
    void
    Reports an error that occurred on either end of the pipe.
    long
    Gets a count of all values accepted by the pipe since opening.
    long
    Gets a count of values accepted by the pipe that have not yet been received.
    long
    Gets a count of all values received from the pipe since opening.
    boolean
    Determines if this pipe is closed.
    boolean
    Determines if both this pipe and its stream are closed.
    long
    pauseController(long receivedCount, Duration timeout)
    Pauses execution on the current thread until values have been received via stream().
    long
    Pauses execution until either a timeout interval expires or all values have been received from the pipe.
    long
    pauseReceiver(long acceptedCount, Duration timeout)
    Pauses execution on the current thread until new values are accepted onto the pipe.
    long
    Pauses execution until either a timeout interval expires or the pipe has been closed.
    Gets a sequential Stream for receiving values as they are accepted by the pipe.
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait

    Methods inherited from interface java.util.function.Consumer

    andThen
  • Constructor Details

    • IuAsynchronousPipe

      public IuAsynchronousPipe()
      Default constructor.
  • Method Details

    • stream

      public Stream<T> stream() throws IllegalStateException
      Gets a sequential Stream for receiving values as they are accepted by the pipe.

      For a parallel stream, call BaseStream.parallel() on the stream returned by this method.

      The Stream returned by this method is not controlled by the pipe and may be retrieved once. The receiving component in control of the stream, and should call this method synchronously during initialization from the thread that creates the pipe. All aggregation details are out scope; all Stream operations are supported natively and without interference by an internally managed Spliterator.

      Returns:
      Stream
      Throws:
      IllegalStateException - if this method is invoked more than once
    • getAcceptedCount

      public long getAcceptedCount()
      Gets a count of all values accepted by the pipe since opening.
      Returns:
      count of accepted values
    • getReceivedCount

      public long getReceivedCount()
      Gets a count of all values received from the pipe since opening.
      Returns:
      count of received values
    • getPendingCount

      public long getPendingCount()
      Gets a count of values accepted by the pipe that have not yet been received.
      Returns:
      count of pending values
    • pauseController

      public long pauseController(long receivedCount, Duration timeout) throws TimeoutException, InterruptedException
      Pauses execution on the current thread until values have been received via stream().

      Typically, the controlling component will invoke this method during a processing loop to manage resource utilization rate relative to the rate values are being retrieved, then invoke pauseController(Instant) to pause until the receiving component has received all values or invoked BaseStream.close().

      The basic controller loop example below checks the pending count before iterating, and if there are 100 pending values in the pipe pauses until 10 of those values have been have been received, or up to PT1S, before scanning for and providing more values. Finally, the controller pauses after all values have been provided until either its workload controller expires, or all values have been received. The PT1S pause in this example represents a keep-alive pulse, for example if the loop is a live iterator over a connected resource then one business resource per second is typically a sufficient keep-alive interval.

       for (final var value : source.getValues()) {
              pipe.accept(value);
              if (pipe.getPendingCount() > 100)
                      try {
                              pipe.pauseController(10, Duration.ofSeconds(1L));
                      } catch (TimeoutException e) {
                              // keep-alive
                      }
       }
       pipe.pauseController(workload.getExpires());
       
      Parameters:
      receivedCount - count of received values to wait for; returns without delay if <= 0
      timeout - amount of time to wait; should be positive
      Returns:
      the actual number of values received while paused
      Throws:
      TimeoutException - if the timeout interval expires before receivedCount values are received
      InterruptedException - if the current thread is interrupted while waiting for values to be received
    • pauseController

      public long pauseController(Instant expires) throws InterruptedException
      Pauses execution until either a timeout interval expires or all values have been received from the pipe.

      Typically, the controlling component will invoke pauseController(int, Duration) during a processing loop to manage resource utilization rate relative to the rate values are being retrieved, then invoke this method to pause until the receiving component has received all values or invoked BaseStream.close().

      The basic controller loop example below checks the pending count before iterating, and if there are 100 pending values in the pipe pauses until 10 of those values have been have been received, or up to PT1S, before scanning for and providing more values. Finally, the controller pauses after all values have been provided until either its workload controller expires, or all values have been received. The PT1S pause in this example represents a keep-alive pulse, for example if the loop is a live iterator over a connected resource then one business resource per second is typically a sufficient keep-alive interval.

       for (final var value : source.getValues()) {
              pipe.accept(value);
              if (pipe.getPendingCount() > 100)
                      try {
                              pipe.pauseController(10, Duration.ofSeconds(1L));
                      } catch (TimeoutException e) {
                              // keep-alive
                      }
       }
       pipe.pauseController(workload.getExpires());
       
      Parameters:
      expires - instant the timeout interval expires
      Returns:
      the number of values received while paused
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for values to be received
    • pauseReceiver

      public long pauseReceiver(long acceptedCount, Duration timeout) throws TimeoutException, InterruptedException
      Pauses execution on the current thread until new values are accepted onto the pipe.

      This method is useful for breaking up output into segments, i.e., via Spliterator.trySplit(), as in the example below:

       final var pipeSplitter = pipe.stream().spliterator();
       while (!pipe.isClosed()) {
              pipe.pauseReceiver(targetSplitSize, workload.getRemaining());
              Spliterator<String> split = pipeSplitter.trySplit();
              if (split != null)
                      final var segmentStream = StreamSupport.stream(split, true);
                      // perform terminal operation on segmentStream
       }
       final var tailStream = StreamSupport.stream(pipeSplitter, true);
       // perform terminal operation on tailStream
       
      Parameters:
      acceptedCount - count of newly accepted values to wait for; returns without delay if <= 0
      timeout - amount of time to wait; should be positive
      Returns:
      the actual number of values accepted while paused
      Throws:
      TimeoutException - if the timeout interval expires before receivedCount values are received
      InterruptedException - if the current thread is interrupted while waiting for values to be received
    • pauseReceiver

      public long pauseReceiver(Instant expires) throws InterruptedException
      Pauses execution until either a timeout interval expires or the pipe has been closed.

      This method is useful for waiting until the controlling component has completed all work before collecting values from the stream, to give the receiving component time-sensitive control over directly blocking via the stream.

      For example, to give the controlling component up to 15 seconds lead time, or for all values to be provided, before collecting from the pipe:

       pipe.pauseReceiver(Instant.now().plus(Duration.ofSeconds(15L));
       final var values = pipe.stream().collect(aCollector);
       
      Parameters:
      expires - instant the timeout interval expires
      Returns:
      the number of values accepted onto the pipe while paused
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the pipe to close
    • isClosed

      public boolean isClosed()
      Determines if this pipe is closed.
      Returns:
      true if closed; else return false
    • isCompleted

      public boolean isCompleted()
      Determines if both this pipe and its stream are closed.
      Returns:
      true if closed; else return false
    • accept

      public void accept(T value)
      Used by the controlling component to supply values to the receiving component via the Consumer interface.

      Values accepted by the pipe may be received via stream().

      Specified by:
      accept in interface Consumer<T>
      Parameters:
      value - supplied by the controlling component.
    • error

      public void error(Throwable e)
      Reports an error that occurred on either end of the pipe.

      The error will interrupt all activity and cause the pipe to close.

      Parameters:
      e - error
    • close

      public void close()
      Used by the controlling component to close the pipe.

      The receiving component should use BaseStream.close() instead of this method to close the pipe.

      Closing the pipe prevents further values from being accepted, then unpauses all threads.

      Specified by:
      close in interface AutoCloseable
      See Also:
    • toString

      public String toString()
      Overrides:
      toString in class Object