- Type Parameters:
T
- value type
- All Implemented Interfaces:
AutoCloseable
,Consumer<T>
Accepts
values for asynchronous retrieval via
Stream
.
A common use case is passing large sets of homogeneous values between heterogeneous components.
Within an asynchronous process: the controlling component
creates a pipe and passes to the receiving component. The
receiving component detaches a Stream
through which values are retrieved. Once connected in this
fashion, the controlling component can asynchronously
supply
values to the receiving
component.
Any time after creating the pipe,
- the controlling component may:
- The receiving component may:
split
the stream.pause
until values have beenaccepted
.- block until values sufficient to complete a
terminal operation
have beenaccepted
.
Although IuAsynchronousPipe
may be used as a simple queue on a single
thread, it is intended to be used concurrently on multiple threads in order
to distribute load related to loading and consuming large sets of homogeneous
values.
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
Used by the controlling component to supply values to the receiving component via theConsumer
interface.void
close()
Used by the controlling component to close the pipe.void
Reports an error that occurred on either end of the pipe.long
Gets a count of all values accepted by the pipe since opening.long
Gets a count of values accepted by the pipe that have not yet been received.long
Gets a count of all values received from the pipe since opening.boolean
isClosed()
Determines if this pipe is closed.boolean
Determines if both this pipe and itsstream
are closed.long
pauseController
(long receivedCount, Duration timeout) Pauses execution on the current thread until values have been received viastream()
.long
pauseController
(Instant expires) Pauses execution until either a timeout interval expires or all values have been received from the pipe.long
pauseReceiver
(long acceptedCount, Duration timeout) Pauses execution on the current thread until new values areaccepted
onto the pipe.long
pauseReceiver
(Instant expires) Pauses execution until either a timeout interval expires or the pipe has been closed.stream()
toString()
-
Constructor Details
-
IuAsynchronousPipe
public IuAsynchronousPipe()Default constructor.
-
-
Method Details
-
stream
Gets a sequentialStream
for receiving values as they areaccepted
by the pipe.For a parallel stream, call
BaseStream.parallel()
on the stream returned by this method.The
Stream
returned by this method is not controlled by the pipe and may be retrieved once. The receiving component in control of the stream, and should call this method synchronously during initialization from the thread thatcreates the pipe
. All aggregation details are out scope; allStream
operations are supported natively and without interference by an internally managedSpliterator
.- Returns:
Stream
- Throws:
IllegalStateException
- if this method is invoked more than once
-
getAcceptedCount
public long getAcceptedCount()Gets a count of all values accepted by the pipe since opening.- Returns:
- count of accepted values
-
getReceivedCount
public long getReceivedCount()Gets a count of all values received from the pipe since opening.- Returns:
- count of received values
-
getPendingCount
public long getPendingCount()Gets a count of values accepted by the pipe that have not yet been received.- Returns:
- count of pending values
-
pauseController
public long pauseController(long receivedCount, Duration timeout) throws TimeoutException, InterruptedException Pauses execution on the current thread until values have been received viastream()
.Typically, the controlling component will invoke this method during a processing loop to manage resource utilization rate relative to the rate values are being retrieved, then invoke
pauseController(Instant)
to pause until the receiving component has received all values or invokedBaseStream.close()
.The basic controller loop example below checks the pending count before iterating, and if there are 100 pending values in the pipe pauses until 10 of those values have been have been received, or up to PT1S, before scanning for and providing more values. Finally, the controller pauses after all values have been provided until either its
workload controller
expires, or all values have been received. The PT1S pause in this example represents a keep-alive pulse, for example if the loop is a live iterator over a connected resource then one business resource per second is typically a sufficient keep-alive interval.for (final var value : source.getValues()) { pipe.accept(value); if (pipe.getPendingCount() > 100) try { pipe.pauseController(10, Duration.ofSeconds(1L)); } catch (TimeoutException e) { // keep-alive } } pipe.pauseController(workload.getExpires());
- Parameters:
receivedCount
- count of received values to wait for; returns without delay if <= 0timeout
- amount of time to wait; should be positive- Returns:
- the actual number of values received while paused
- Throws:
TimeoutException
- if the timeout interval expires beforereceivedCount
values are receivedInterruptedException
- if the current thread is interrupted while waiting for values to be received
-
pauseController
Pauses execution until either a timeout interval expires or all values have been received from the pipe.Typically, the controlling component will invoke
pauseController(int, Duration)
during a processing loop to manage resource utilization rate relative to the rate values are being retrieved, then invoke this method to pause until the receiving component has received all values or invokedBaseStream.close()
.The basic controller loop example below checks the pending count before iterating, and if there are 100 pending values in the pipe pauses until 10 of those values have been have been received, or up to PT1S, before scanning for and providing more values. Finally, the controller pauses after all values have been provided until either its
workload controller
expires, or all values have been received. The PT1S pause in this example represents a keep-alive pulse, for example if the loop is a live iterator over a connected resource then one business resource per second is typically a sufficient keep-alive interval.for (final var value : source.getValues()) { pipe.accept(value); if (pipe.getPendingCount() > 100) try { pipe.pauseController(10, Duration.ofSeconds(1L)); } catch (TimeoutException e) { // keep-alive } } pipe.pauseController(workload.getExpires());
- Parameters:
expires
- instant the timeout interval expires- Returns:
- the number of values received while paused
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for values to be received
-
pauseReceiver
public long pauseReceiver(long acceptedCount, Duration timeout) throws TimeoutException, InterruptedException Pauses execution on the current thread until new values areaccepted
onto the pipe.This method is useful for breaking up output into segments, i.e., via
Spliterator.trySplit()
, as in the example below:final var pipeSplitter = pipe.stream().spliterator(); while (!pipe.isClosed()) { pipe.pauseReceiver(targetSplitSize, workload.getRemaining()); Spliterator<String> split = pipeSplitter.trySplit(); if (split != null) final var segmentStream = StreamSupport.stream(split, true); // perform terminal operation on segmentStream } final var tailStream = StreamSupport.stream(pipeSplitter, true); // perform terminal operation on tailStream
- Parameters:
acceptedCount
- count of newly accepted values to wait for; returns without delay if <= 0timeout
- amount of time to wait; should be positive- Returns:
- the actual number of values accepted while paused
- Throws:
TimeoutException
- if the timeout interval expires beforereceivedCount
values are receivedInterruptedException
- if the current thread is interrupted while waiting for values to be received
-
pauseReceiver
Pauses execution until either a timeout interval expires or the pipe has been closed.This method is useful for waiting until the controlling component has completed all work before collecting values from the stream, to give the receiving component time-sensitive control over directly blocking via the stream.
For example, to give the controlling component up to 15 seconds lead time, or for all values to be provided, before collecting from the pipe:
pipe.pauseReceiver(Instant.now().plus(Duration.ofSeconds(15L)); final var values = pipe.stream().collect(aCollector);
- Parameters:
expires
- instant the timeout interval expires- Returns:
- the number of values accepted onto the pipe while paused
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the pipe to close
-
isClosed
public boolean isClosed()Determines if this pipe is closed.- Returns:
- true if closed; else return false
-
isCompleted
public boolean isCompleted()Determines if both this pipe and itsstream
are closed.- Returns:
- true if closed; else return false
-
accept
Used by the controlling component to supply values to the receiving component via theConsumer
interface. -
error
Reports an error that occurred on either end of the pipe.The error will interrupt all activity and cause the pipe to close.
- Parameters:
e
- error
-
close
public void close()Used by the controlling component to close the pipe.The receiving component should use
BaseStream.close()
instead of this method to close the pipe.Closing the pipe prevents further values from being
accepted
, then unpauses all threads.- Specified by:
close
in interfaceAutoCloseable
- See Also:
-
toString
-