IuEnumerableQueue.java
/*
* Copyright © 2025 Indiana University
* All rights reserved.
*
* BSD 3-Clause License
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package edu.iu;
import java.util.ArrayDeque;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Implements a simple non-blocking FIFO queue based on the {@link Consumer},
* and {@link Enumeration}, and {@link BooleanSupplier} interfaces.
*
* <p>
* In this scenario, {@link #getAsBoolean()}, {@link #hasNext()}, and
* {@link #hasMoreElements()} are synonymous.
* </p>
*
* <p>
* This class is thread-safe and intended for use in high-volume parallel
* processing scenarios.
* </p>
*
* <p>
* For blocking behavior, use
* {@link IuObject#waitFor(Object, BooleanSupplier, java.time.Duration)} or
* {@link IuObject#waitFor(Object, BooleanSupplier, java.time.Instant)} as in
* the example below:
* </p>
*
* <pre>
* IuObject.waitFor(enumerableQueue, enumerableQueue, Duration.ofSeconds(5L));
* </pre>
*
* @param <T> element type
*/
public class IuEnumerableQueue<T>
implements Consumer<T>, BooleanSupplier, Enumeration<T>, Iterator<T>, Spliterator<T>, Iterable<T> {
private final ConcurrentLinkedQueue<Optional<T>> queue = new ConcurrentLinkedQueue<>();
/**
* Default constructor.
*/
public IuEnumerableQueue() {
}
/**
* Constructor
*
* @param e supplies initial values
*/
public IuEnumerableQueue(Enumeration<T> e) {
this(e.asIterator());
}
/**
* Constructor
*
* @param e supplies initial values
*/
public IuEnumerableQueue(Iterator<T> e) {
e.forEachRemaining(this);
}
/**
* Constructor
*
* @param e supplies initial values
*/
public IuEnumerableQueue(Spliterator<T> e) {
e.forEachRemaining(this);
}
/**
* Constructor
*
* @param e supplies initial values
*/
public IuEnumerableQueue(Iterable<T> e) {
e.forEach(this);
}
/**
* Constructor
*
* @param e supplies initial values
*/
public IuEnumerableQueue(Stream<T> e) {
e.forEach(this);
}
@Override
public void accept(T t) {
queue.offer(Optional.ofNullable(t));
handleChange();
}
@Override
public boolean getAsBoolean() {
return !queue.isEmpty();
}
@Override
public boolean hasMoreElements() {
return !queue.isEmpty();
}
@Override
public boolean hasNext() {
return !queue.isEmpty();
}
@Override
public T next() {
return nextElement();
}
@Override
public T nextElement() {
final var next = queue.poll();
if (next == null)
throw new NoSuchElementException();
handleChange();
return next.orElse(null);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
final var next = queue.poll();
if (next != null) {
handleChange();
action.accept(next.orElse(null));
return true;
} else
return false;
}
@Override
public Spliterator<T> trySplit() {
final var splitSize = queue.size() / 2;
if (splitSize < 16)
return null;
final var split = new ArrayDeque<T>();
while (split.size() < splitSize)
split.offer(queue.poll().orElse(null));
handleChange();
return split.spliterator();
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT;
}
private synchronized void handleChange() {
notifyAll();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
Spliterator.super.forEachRemaining(action);
}
@Override
public Iterator<T> iterator() {
return this;
}
@Override
public Iterator<T> asIterator() {
return this;
}
/**
* Gets a {@link Stream} of queued elements.
*
* @return {@link Stream}
*/
public Stream<T> stream() {
return StreamSupport.stream(this, false);
}
/**
* Gets a parallel {@link Stream} of queued elements.
*
* @return {@link Stream}
*/
public Stream<T> parallelStream() {
return StreamSupport.stream(this, true);
}
}