IuRateLimitter.java
/*
* Copyright © 2024 Indiana University
* All rights reserved.
*
* BSD 3-Clause License
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package edu.iu;
import java.time.Duration;
import java.time.Instant;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* Unifies control over a bounded queue of {@link IuTaskController controlled
* tasks}.
*
* <p>
* This rate limiter is fail-fast. All tasks are expected to complete normally
* without error. The first error or timeout condition <em>should</em> be thrown
* when the controlling process reaches its next blocking operation.
* </p>
*
* <p>
* Once a fixed limit on incomplete tasks has been reached, new tasks cannot be
* accepted without first removing and joining the task at the head of the
* queue. Once any task has timed out or produced an error, any attempt to
* {@link #accept(IuTaskController)} or {@link #join()} will throw that error.
* Additional errors encountered after the first error will be suppressed.
* </p>
*
* <pre>
* final var limit = new IuRateLimitter(10, Duration.ofSeconds(1L));
* for (UnsafeRunnable task : tasks)
* limit.accept(workload.accept(task));
* limit.join();
* </pre>
*/
public class IuRateLimitter implements UnsafeConsumer<IuTaskController>, IuTaskController {
private final Instant start = Instant.now();
private final Instant expires;
private final int limit;
private final Deque<IuTaskController> queue = new ConcurrentLinkedDeque<>();
private final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
private final Queue<TimeoutException> timeouts = new ConcurrentLinkedQueue<>();
/**
* Constructor.
*
* @param limit upper limit on the number of tasks that may be pending in the
* queue.
* @param timeout timeout interval
*/
public IuRateLimitter(int limit, Duration timeout) {
this(limit, Instant.now().plus(timeout));
}
/**
* Constructor.
*
* @param limit upper limit on the number of tasks that may be pending in the
* queue.
* @param expires instant the workload timeout interval expires
*/
public IuRateLimitter(int limit, Instant expires) {
if (limit <= 0)
throw new IllegalArgumentException("Limit must be positive");
this.limit = limit;
this.expires = expires;
}
/**
* Checks for a non-blocking failure condition that would prevent a new task
* from being {@link #accept(IuTaskController) accepted} for processing.
*
* <p>
* The method <em>should</em> be called returning control from any blocking
* operation, before scheduling a new task to be accepted by this rate limiter.
* If the task would be rejected, an error is thrown to prevent it from being
* scheduled.
* </p>
*
* <p>
* At the instant of return, the following conditions are guaranteed to have
* been true:
* </p>
* <ul>
* <li>No {@link IuTaskController#getError() error} was observed by an
* {@link #accept(IuTaskController) accepted} task. {@link ExecutionException}
* will be thrown with the first error observed as the cause; additional
* observed errors will be suppressed.</li>
* <li>No tasks have expired without completing successfully.
* {@link TimeoutException} will be thrown; any related task
* {@link TimeoutException}s will be suppressed.</li>
* </ul>
*
* @throws ExecutionException if an {@link IuTaskController#getError() error}
* has been observed by an
* {@link #accept(IuTaskController) accepted} task.
* @throws TimeoutException if any task has expired; related task timeouts
* will be suppressed
*/
public void failFast() throws ExecutionException, TimeoutException {
observeCompletedTasks();
if (!errors.isEmpty()) {
final var errorIterator = errors.iterator();
final var executionException = new ExecutionException(errorIterator.next());
errorIterator.forEachRemaining(executionException::addSuppressed);
timeouts.forEach(executionException::addSuppressed);
throw executionException;
}
final var now = Instant.now();
if (!timeouts.isEmpty() //
|| !now.isBefore(expires)) {
final Duration timeout;
if (now.isBefore(expires))
timeout = Duration.between(start, now);
else
timeout = Duration.between(start, expires);
final var timeoutException = new TimeoutException("Timed out after " + timeout);
timeouts.forEach(timeoutException::addSuppressed);
throw timeoutException;
}
}
/**
* Accepts a task for parallel processing, blocking until an error is observed,
* a timeout interval expires, the task completes successfully, or until there
* is room in the queue.
*
* <p>
* At the instant of return, the following conditions are guaranteed to have
* been true:
* </p>
* <ul>
* <li>{@link #failFast()} returned successfully.</li>
* <li>The new task did not observe an error
* <ul>
* <li>else {@link ExecutionException} is thrown with the observed error as the
* cause.</li>
* </ul>
* </li>
* <li>The new task did not {@link IuTaskController#isExpired() expire}
* <ul>
* <li>else {@link TimeoutException} is thrown.</li>
* </ul>
* </li>
* <li>The task either {@link IuTaskController#isComplete() completed}
* {@link IuTaskController#isSuccess() succesfully}, or was accepted for
* parallel processing.</li>
* <li>Accepted tasks <em>may</em> have been {@link IuTaskController#join()
* joined} to create room in the queue.</li>
* <li>No more than {@link #IuRateLimitter(int, Instant) limit} accepted tasks,
* including the new task, were incomplete.</li>
* </ul>
*
* @param taskController task controller
* @throws ExecutionException if an observed; the observed error is the cause
* @throws InterruptedException from {@link IuTaskController#join()}
* @throws TimeoutException if a timeout interval expires
*/
@Override
public void accept(IuTaskController taskController)
throws ExecutionException, InterruptedException, TimeoutException {
while (true) {
failFast();
final var complete = taskController.isComplete();
final var error = taskController.getError();
if (error != null) {
errors.offer(error);
throw new ExecutionException(error);
}
if (taskController.isExpired())
try {
taskController.join();
// _should_ throw TimeoutException
} catch (TimeoutException e) {
timeouts.add(e);
throw e;
}
if (complete)
return;
final IuTaskController overflowTask;
synchronized (this) {
if (queue.size() >= limit)
overflowTask = queue.poll();
else {
queue.offer(taskController);
return;
}
}
joinAndObserve(overflowTask);
}
}
@Override
public Instant getStart() {
return start;
}
@Override
public Duration getElapsed() {
return Duration.between(start, Instant.now());
}
@Override
public Duration getRemaining() {
observeCompletedTasks();
final var now = Instant.now();
if (isExpired())
if (now.isBefore(expires))
return Duration.ZERO;
else
return Duration.between(now, expires);
else
return Duration.between(now, expires);
}
@Override
public Instant getExpires() {
if (isExpired()) {
final var now = Instant.now();
if (now.isBefore(expires))
return now;
}
return expires;
}
@Override
public boolean isComplete() {
observeCompletedTasks();
return queue.isEmpty();
}
@Override
public boolean isSuccess() {
return isComplete() && errors.isEmpty();
}
@Override
public Throwable getError() {
return errors.peek();
}
@Override
public boolean isExpired() {
observeCompletedTasks();
return !timeouts.isEmpty() || !Instant.now().isBefore(expires);
}
@Override
public void join() throws ExecutionException, InterruptedException, TimeoutException {
failFast();
IuTaskController task;
while ((task = queue.poll()) != null)
joinAndObserve(task);
}
@Override
public void pause() throws InterruptedException, TimeoutException {
observeCompletedTasks();
final var task = queue.peekFirst();
if (task != null)
task.pause();
}
@Override
public void unpause() {
observeCompletedTasks();
queue.forEach(IuTaskController::unpause);
}
@Override
public void interrupt() {
observeCompletedTasks();
queue.forEach(IuTaskController::interrupt);
}
private void joinAndObserve(IuTaskController taskController)
throws ExecutionException, InterruptedException, TimeoutException {
try {
taskController.join();
} catch (ExecutionException e) {
errors.add(e.getCause());
for (Throwable suppressed : e.getSuppressed())
errors.offer(suppressed);
throw e;
} catch (TimeoutException e) {
timeouts.add(e);
throw e;
}
}
private void observeCompleted(IuTaskController task) {
final var error = task.getError();
if (error != null)
errors.offer(error);
else if (!task.isSuccess())
errors.offer(new IllegalStateException("Task completed unsuccessfully but didn't provide an error"));
}
/**
* Observes status (non-blocking) of all queued tasks, flags the process as
* expired, observes errors, and removes completed tasks from the queue.
*/
private void observeCompletedTasks() {
final var i = queue.iterator();
while (i.hasNext()) {
final var next = i.next();
if (next.isExpired()) {
try {
joinAndObserve(next);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
// TimeoutException and ExecutionException are handled by joinAndObserve, silence re-throw
// InterruptedException is not allowed when IuTaskController#isExpired()
}
}
if (next.isComplete()) {
i.remove();
observeCompleted(next);
}
}
}
}