IuAsynchronousSubject.java
/*
* Copyright © 2024 Indiana University
* All rights reserved.
*
* BSD 3-Clause License
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package edu.iu;
import java.time.Duration;
import java.time.Instant;
import java.util.Queue;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Provides <strong>subscriber</strong> {@link Stream} instances over a shared
* source <strong>subject</strong>.
*
* <p>
* Each <strong>subject</strong> is backed externally by a <strong>controlling
* component</strong> that:
* </p>
* <ul>
* <li>Provides an <strong>initial {@link Spliterator split}</strong> of
* available values at the point in time a new {@link #subscribe() subscription}
* is created.</li>
* <li>{@link #accept(Object) Accepts} new values to be distributed to active
* <strong>subscribers</strong>.</li>
* </ul>
*
* <p>
* After the <strong>initial {@link Spliterator split}</strong> is created, but
* before the values available at {@link #subscribe() subscription} time have
* all been {@link Spliterator#tryAdvance(Consumer) advanced}, newly
* {@link #accept(Object) accepted} values are offered to a queue. Values
* <em>may</em> be removed from the queue if also advanced from the initial
* split. Queued values will be polled and advanced after the last split of the
* initial split advances its last value.
* </p>
*
* <p>
* After all queued values have been advanced, the <strong>subscriber</strong>
* transitions to a dedicated {@link IuAsynchronousPipe} and passes new values
* through to {@link IuAsynchronousPipe#accept(Object)}.
* </p>
*
* <p>
* The <strong>subject</strong> makes no guarantee to the <strong>controlling
* component</strong> when or if a <strong>subscriber</strong> will transition
* from non-blocking access to available values, to potentially blocking access
* via {@link IuAsynchronousPipe}. It is, however, guaranteed that all values
* available from the point in time the <strong>subscriber</strong> begins
* processing the {@link Stream} until <strong>unsubscribing</strong> will be
* supplied exactly once regardless of how the value is actually delivered.
* </p>
*
* <img src="doc-files/IuAsynchronousSubject.svg" alt="UML Sequence Diagram">
*
* <p>
* New values <em>should</em> be {@link #accept(Object) accepted} before
* appending the external source, to gracefully avoid a potential race condition
* between the initial split and an internal appended values queue. For example:
* </p>
*
* <pre>
* class MyControllingComponent<T> implements Consumer<T> {
* private final Queue<T> queue = new ConcurrentLinkedQueue<>();
* private final IuAsynchronousSubject<T> subject =
* new IuAsynchronousSubject<>(queue::spliterator);
*
* {@literal @}Override
* public void accept(T t) {
* subject.accept(t);
* queue.offer(t);
* }
* }
* </pre>
*
* <p>
* When the <strong>source</strong> is sequential, <strong>subscribers</strong>
* <em>may</em> expect to receive values in the order supplied.
* </p>
*
* <p>
* Each <strong>subscriber</strong> {@link Stream} provides all values that may
* be retrieved without blocking, then transitions to an
* {@link IuAsynchronousPipe} managed by the <strong>subject</strong> to block
* until new values become available.
* </p>
*
* <p>
* A <strong>subscriber's</strong> {@link Stream} may be {@link Stream#close()
* closed} without affecting the status of the <strong>controlling
* component</strong>, or of any other <strong>subscriber</strong> {@link Stream
* streams}.
* </p>
*
* <p>
* This class is thread-safe and intended for use by high-volume
* parallel-processing workloads.
* </p>
*
* @param <T> value type
* @see IuAsynchronousPipe
*/
public class IuAsynchronousSubject<T> implements Consumer<T>, AutoCloseable {
private class SourceSplit implements Spliterator<T> {
private final Subscriber subscriber;
private volatile Spliterator<T> delegate;
private SourceSplit(Spliterator<T> delegate, Subscriber subscriber) {
this.subscriber = subscriber;
this.delegate = delegate;
subscriber.children.offer(this);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (delegate != null //
&& delegate.tryAdvance(subscriber.cancelAcceptedValueAfterAction(action))) {
if (delegate.estimateSize() == 0)
delegate = null;
return true;
} else
delegate = null;
return subscriber.continueAdvance(action);
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
if (delegate != null) {
delegate.forEachRemaining(subscriber.cancelAcceptedValueAfterAction(action));
delegate = null;
}
subscriber.continueForEach(action);
}
@Override
public Spliterator<T> trySplit() {
if (delegate == null)
return null;
final var split = delegate.trySplit();
if (split == null)
return null;
else
return new SourceSplit(split, subscriber);
}
@Override
public long estimateSize() {
var count = 0L;
final var delegate = this.delegate;
if (delegate != null)
count += delegate.estimateSize();
if (subscriber.isExhausted() //
|| (subscriber.children.size() == 1 //
&& subscriber.children.contains(this)))
count += subscriber.acceptedSize();
return count;
}
@Override
public int characteristics() {
final var delegate = this.delegate;
if (delegate != null)
return delegate.characteristics();
else
return SIZED;
}
}
private class Subscriber implements Spliterator<T>, IuAsynchronousSubscription<T> {
private final Stream<T> stream;
// private volatile Source<T> source;
private final Queue<SourceSplit> children = new ConcurrentLinkedDeque<>();
private final Queue<T> accepted = new ConcurrentLinkedQueue<>();
private volatile Spliterator<T> delegate;
private volatile long acceptedCount;
private volatile Throwable error;
private volatile boolean closed;
private volatile IuAsynchronousPipe<T> pipe;
private volatile Spliterator<T> pipedSplit;
private Subscriber() {
final var delegate = initialSplitSupplier.get();
if (delegate.estimateSize() > 0)
this.delegate = delegate;
subscribers.offer(this);
stream = StreamSupport.stream(this, false).onClose(this::close);
}
@Override
public Spliterator<T> trySplit() {
final var delegate = this.delegate;
if (delegate != null) {
final var split = delegate.trySplit();
if (split != null)
return new SourceSplit(split, this);
}
if (error != null)
throw IuException.unchecked(error);
else if (pipedSplit != null)
return pipedSplit.trySplit();
else
return null;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
final var delegate = this.delegate;
if (delegate != null //
&& delegate.tryAdvance(cancelAcceptedValueAfterAction(action))) {
if (delegate.estimateSize() == 0)
this.delegate = null;
return true;
} else
this.delegate = null;
if (continueAdvance(action)) {
if (!canAdvance())
bootstrapPipe();
return true;
}
bootstrapPipe();
if (error != null)
throw IuException.unchecked(error);
else if (pipedSplit != null)
return pipedSplit.tryAdvance(action);
else
return false;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
if (delegate != null) {
delegate.forEachRemaining(cancelAcceptedValueAfterAction(action));
delegate = null;
}
continueForEach(action);
bootstrapPipe();
if (error != null)
throw IuException.unchecked(error);
else if (pipedSplit != null)
pipedSplit.forEachRemaining(action);
}
@Override
public synchronized long available() {
var count = 0;
if (delegate != null //
&& delegate.hasCharacteristics(SIZED))
count += delegate.estimateSize();
if (areChildrenExhausted())
count += acceptedSize();
if (pipe != null)
count += pipe.getPendingCount();
return count;
}
@Override
public synchronized long estimateSize() {
if (pipedSplit == null) {
if (!closed && error == null)
return Long.MAX_VALUE;
else
return available();
} else
return pipedSplit.estimateSize();
}
@Override
public int characteristics() {
if (pipedSplit == null)
if (!isClosed())
return CONCURRENT;
else
return IMMUTABLE | SIZED;
else
return pipedSplit.characteristics();
}
@Override
public Stream<T> stream() {
return stream;
}
@Override
public long pause(long acceptedCount, Duration timeout) throws TimeoutException, InterruptedException {
if (acceptedCount <= 0L)
return 0L;
final var now = Instant.now();
final var expires = now.plus(timeout);
final var initCount = this.acceptedCount;
final var targetCount = initCount + acceptedCount;
IuObject.waitFor(this, //
() -> isClosed() //
|| this.acceptedCount >= targetCount //
, expires);
return this.acceptedCount - initCount;
}
@Override
public long pause(Instant expires) throws InterruptedException {
final var initCount = acceptedCount;
synchronized (this) {
while (!isClosed()) {
final var now = Instant.now();
if (now.isBefore(expires)) {
final var waitFor = Duration.between(now, expires);
this.wait(waitFor.toMillis(), waitFor.toNanosPart() % 1_000_000);
} else
break;
}
}
return acceptedCount - initCount;
}
@Override
public synchronized boolean isClosed() {
if (pipe == null)
return closed || error != null;
else
return pipe.isClosed();
}
@Override
public synchronized void error(Throwable e) {
if (pipe != null)
pipe.error(e);
else
error = e;
this.notifyAll();
}
@Override
public synchronized void close() {
subscribers.remove(this);
if (pipe != null)
pipe.close();
else
closed = true;
this.notifyAll();
}
private boolean isExhausted() {
if (delegate != null)
return false;
return areChildrenExhausted();
}
private boolean canAdvance() {
return !isExhausted() || !accepted.isEmpty();
}
private boolean canAccept() {
return delegate != null || !accepted.isEmpty();
}
private boolean areChildrenExhausted() {
final var i = children.iterator();
while (i.hasNext())
if (i.next().delegate == null)
i.remove();
else
return false;
return true;
}
private int acceptedSize() {
return accepted.size();
}
private Consumer<? super T> cancelAcceptedValueAfterAction(Consumer<? super T> action) {
return value -> {
action.accept(value);
synchronized (this) {
accepted.remove(value);
this.notifyAll();
}
};
}
private boolean continueAdvance(Consumer<? super T> action) {
if (isExhausted()) {
final var value = accepted.poll();
if (value != null) {
action.accept(value);
synchronized (this) {
this.notifyAll();
}
return true;
}
}
return false;
}
private void continueForEach(Consumer<? super T> action) {
if (isExhausted()) {
while (!accepted.isEmpty()) {
action.accept(accepted.poll());
synchronized (this) {
this.notifyAll();
}
}
}
}
private synchronized void bootstrapPipe() {
if (pipe == null //
&& error == null //
&& !closed) {
pipe = new IuAsynchronousPipe<>();
pipedSplit = pipe.stream().spliterator();
}
}
private void accept(T t) {
synchronized (this) {
if (canAccept())
accepted.offer(t);
else {
bootstrapPipe();
pipe.accept(t);
}
acceptedCount++;
this.notifyAll();
}
}
}
private final Supplier<Spliterator<T>> initialSplitSupplier;
private final Queue<Subscriber> subscribers = new ConcurrentLinkedQueue<>();
private boolean closed;
/**
* Creates a new <strong>subject</strong>.
*
* @param initialSplitSupplier supplies the initial split backing new
* <strong>subscriber</strong> streams.
*/
public IuAsynchronousSubject(Supplier<Spliterator<T>> initialSplitSupplier) {
this.initialSplitSupplier = initialSplitSupplier;
}
/**
* <strong>Subscribes</strong> to a {@link Stream} that supplies all values
* available without blocking then blocks until new values are available or the
* <strong>subject</strong> is {@link #close() closed}.
*
* @return {@link IuAsynchronousSubscription}
*/
public IuAsynchronousSubscription<T> subscribe() {
if (closed)
throw new IllegalStateException("closed");
return new Subscriber();
}
/**
* Distributes a value to all potentially blocking <strong>subscribers</strong>
* that have completed the transition to an {@link IuAsynchronousPipe}.
*
* <p>
* This method does not supply values to <strong>subscribers</strong> that
* haven't yet completed the transition. The <strong>controlling
* component</strong> is responsible for independently supplying those values to
* it's {@link Spliterator}-supplying backing <strong>source</strong>.
* </p>
*
* @param value value to supply to all <strong>subscribers</strong>
*/
@Override
public synchronized void accept(T value) {
if (closed)
throw new IllegalStateException("closed");
subscribers.forEach(subscriber -> subscriber.accept(value));
}
/**
* Closes the <strong>subject</strong>.
*
* <p>
* Once closed:
* </p>
* <ul>
* <li>Existing <strong>subscribers</strong> may finish retrieving all values
* already supplied</li>
* <li>Blocking <strong>subscriber</strong> {@link Stream}s will be terminated
* gracefully</li>
* <li>No new <strong>subscribers</strong> may be created</li>
* <li>No new <strong>values</strong> may be supplied</li>
* </ul>
*/
@Override
public synchronized void close() {
if (closed)
return;
closed = true;
Throwable e = null;
while (!subscribers.isEmpty())
e = IuException.suppress(e, () -> subscribers.poll().close());
if (e != null)
throw IuException.unchecked(e);
}
/**
* Reports a fatal error to all <strong>subscribers</strong> and {@link #close()
* closes} the <strong>subject</strong>.
*
* @param e fatal error
*/
public synchronized void error(Throwable e) {
while (!subscribers.isEmpty())
IuException.suppress(e, () -> subscribers.poll().error(e));
closed = true;
}
}