IuParallelWorkloadController.java
/*
* Copyright © 2025 Indiana University
* All rights reserved.
*
* BSD 3-Clause License
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package edu.iu;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import iu.ParallelTaskController;
/**
* Controls parallel processing over a bounded workload.
*
* <p>
* <img alt="UML Class Diagram" src=
* "doc-files/edu.iu.IuParallelWorkloadController.svg" />
* </p>
*
* <p>
* Each controller is {@link AutoCloseable closeable}, timed, and operates in a
* dedicated task executor. It's expected for all tasks related to the workload
* to complete within the established a given {@link Duration timeout interval},
* after which active tasks will be interrupted and related resources will be
* torn down.
* </p>
*
* <p>
* All tasks <em>should</em> handle exceptions. By default, any
* {@link Throwable} not handled by task execution notifies an immediate stop to
* all work and for the controller to close. The exception that caused the
* failure will be thrown as the cause of {@link ExecutionException} from
* {@link #await()}.
* </p>
*
* @see IuAsynchronousPipe
* @see IuTaskController
* @see IuRateLimitter
*/
public class IuParallelWorkloadController
implements UnsafeFunction<UnsafeConsumer<IuTaskController>, IuTaskController>, AutoCloseable {
private final static Logger LOG = Logger.getLogger(IuParallelWorkloadController.class.getName());
private final Instant start;
private final Instant expires;
private final Duration timeout;
private Duration gracefulShutdown = Duration.ofMillis(500L);
private Duration gracefulTermination = Duration.ofSeconds(2L);
private Duration gracefulDestroy = Duration.ofSeconds(3L);
private Consumer<Throwable> failedExecutionHandler = this::defaultHandleFailedExecution;
private final int size;
private final ThreadLocal<Integer> usageCount = new ThreadLocal<>();
private Logger log = LOG;
private volatile long spawned;
private volatile long pending;
private volatile long completed;
private volatile Throwable severeFailure;
private volatile boolean closed;
private ThreadGroup threadGroup;
private ThreadPoolExecutor exec;
private Timer closeTimer;
private class CloseTask extends TimerTask {
@Override
public void run() {
final Logger log = IuParallelWorkloadController.this.log;
try {
log.config(() -> "Terminating all execution due to parallel workload timeout");
close();
} catch (Throwable e) {
log.log(Level.WARNING, e, () -> "Execution errors detected by parallel workload timeout");
if (severeFailure == null)
severeFailure = e;
else
severeFailure.addSuppressed(e);
}
}
}
/**
* Creates a new workload controller.
*
* @param name descriptive name of the workload, for logging and error
* reporting
* @param size maximum number of parallel tasks to execute at the same time
* @param timeout total time to live for all workload-related tasks
*/
public IuParallelWorkloadController(String name, int size, Duration timeout) {
if (timeout.isNegative() || timeout.isZero())
throw new IllegalArgumentException("timeout must be positive");
if (size < 1)
throw new IllegalArgumentException("size must be positive");
this.size = size;
this.timeout = timeout;
threadGroup = new ThreadGroup(name);
final Timer closeTimer = new Timer(name + "/closeTimer");
closeTimer.schedule(new CloseTask(), timeout.toMillis());
this.closeTimer = closeTimer;
exec = new ThreadPoolExecutor(size, Math.max(5, size * 2), timeout.toMillis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), r -> {
long threadNum;
synchronized (IuParallelWorkloadController.this) {
threadNum = spawned++;
}
Thread thread = new Thread(threadGroup, r, name + '/' + threadNum);
log.config("Spawned " + thread.getName());
return thread;
});
start = Instant.now();
expires = start.plus(timeout);
}
// BEGIN customization hooks
/**
* Provides an alternative logger to use for tracing workload events.
*
* <p>
* Uses a class-level delegating default logger until this method provides an
* alternative.
* </p>
*
* @param log alternative logger
*/
public void setLog(Logger log) {
if (this.log != LOG)
throw new IllegalStateException("Logger already initialized");
Objects.requireNonNull(log, "log").config(() -> "Logger configured " + this);
this.log = log;
}
/**
* Sets the time to wait for worker threads to complete after closing the
* controller, before shutting down the thread pool.
*
* @param gracefulShutdown {@link Duration}
*/
public void setGracefulShutdown(Duration gracefulShutdown) {
if (Objects.requireNonNull(gracefulShutdown, "gracefulShutdown").compareTo(Duration.ZERO) <= 0)
throw new IllegalArgumentException("Must be positive");
this.gracefulShutdown = gracefulShutdown;
}
/**
* Sets the time to wait for the thread pool to shut down, after closing the
* controller and waiting for worker threads to
* {@link #setGracefulShutdown(Duration) complete gracefully}, before
* interrupting all threads managed by this controller.
*
* @param gracefulTermination {@link Duration}
*/
public void setGracefulTermination(Duration gracefulTermination) {
if (Objects.requireNonNull(gracefulTermination, "gracefulTermination").compareTo(Duration.ZERO) <= 0)
throw new IllegalArgumentException("Must be positive");
this.gracefulTermination = gracefulTermination;
}
/**
* Sets the time to wait for the thread pool to shut down, after closing the
* controller, waiting for worker threads to
* {@link #setGracefulShutdown(Duration) complete gracefully}, and interrupting
* all threads managed by this controller.
*
* <p>
* After the graceful destroy period has passed, all threads managed by this
* controller and still running will be abandoned and a {@link Level#WARNING}
* will be logged. This condition indicates a possible resource leak.
* </p>
*
* @param gracefulDestroy {@link Duration}
*/
public void setGracefulDestroy(Duration gracefulDestroy) {
if (Objects.requireNonNull(gracefulDestroy, "gracefulDestroy").compareTo(Duration.ZERO) <= 0)
throw new IllegalArgumentException("Must be positive");
this.gracefulDestroy = gracefulDestroy;
}
/**
* Overrides the default execution handler.
*
* @param failedExecutionHandler consumer to accept any task execution failures,
* <em>may</em> call
* {@link #defaultHandleFailedExecution(Throwable)}
* to inject default behavior.
*/
public void setFailedExecutionHandler(Consumer<Throwable> failedExecutionHandler) {
this.failedExecutionHandler = failedExecutionHandler;
}
/**
* Invoked by default if any task throws an exception during execution.
*
* @param cause task execution error
*/
public synchronized void defaultHandleFailedExecution(Throwable cause) {
if (severeFailure == null)
severeFailure = cause;
else
severeFailure.addSuppressed(cause);
closeTimer.schedule(new CloseTask(), 0L);
}
/**
* Gets a count of threads spawned by this controller.
*
* @return thread count
*/
public long getSpawnedThreadCount() {
return spawned;
}
/**
* Gets a count of tasks submitted to this controller that have not yet
* completed.
*
* @return pending task count
*/
public long getPendingTaskCount() {
return pending;
}
/**
* Gets a count of tasks completed by this controller.
*
* @return pending task count
*/
public long getCompletedTaskCount() {
return completed;
}
/**
* Gets the time elapsed since the controller was created.
*
* @return time elapsed
*/
public Duration getElapsed() {
return Duration.between(start, Instant.now());
}
/**
* Gets the time remaining until the controller expires.
*
* @return time remaining; may be zero or negative if already
* {@link #isExpired()}
*/
public Duration getRemaining() {
return Duration.between(Instant.now(), expires);
}
/**
* Gets the instant the timeout interval expires.
*
* @return instant the timeout interval expires.
*/
public Instant getExpires() {
return expires;
}
/**
* Determines whether or not the controller has expired.
*
* <p>
* Once expired, all threads waiting on the controller will be notified and the
* controller will be closed. No more tasks may be submitted once the controller
* has expired; all remaining tasks will be interrupted and/or throw
* {@link IllegalStateException} with a message indicating a timeout.
* </p>
*
* @return true if the controller is expired; else false
*/
public boolean isExpired() {
return !Instant.now().isBefore(expires);
}
/**
* Wait until either all pending tasks have completed, or until the controller
* {@link #isExpired() expires}.
*
* @throws ExecutionException If a task used
* {@link #defaultHandleFailedExecution(Throwable)}
* to report an execution error. The first error
* reported will be the cause, and additional
* errors reported prior to shutdown will be
* suppressed.
* @throws InterruptedException if interrupted waiting for pending tasks to
* complete. If task execution was interrupted,
* that condition will be the cause of
* {@link ExecutionException};
* {@link InterruptedException} thrown directly
* indicates this thread, not a task thread, was
* interrupted.
* @throws TimeoutException if the controller expires before all pending
* tasks complete normally. If task times out, that
* condition will be the cause of
* {@link ExecutionException};
* {@link TimeoutException} thrown directly
* indicates this thread, not a task thread, timed
* out.
*/
public synchronized void await() throws ExecutionException, InterruptedException, TimeoutException {
// deadlock prevention: don't include the current thread if controlling a task
int min = Thread.currentThread().getThreadGroup() == threadGroup ? 1 : 0;
IuObject.waitFor(this, () -> severeFailure != null || pending <= min, expires, this::createTimeoutException);
if (severeFailure != null)
throw new ExecutionException(severeFailure);
}
/**
* Submits an asynchronous task for processing.
*
* <p>
* This method will block until a thread is available for excuting the task, or
* until the controller has {@link #isExpired() expired}. Applications
* <em>should</em>, however, use {@link IuRateLimitter} or similar to restrict
* prevent the need for blocking, and <em>should</em> enforce SLOs on workload
* runtimes to ensure algorithm scalability can be calculated to approach but
* not reach the upper limit.
* </p>
*
* <p>
* To ensure resources are released gracefully and efficiently when the workload
* timeout expires, both the thread the created the task and the thread
* executing the task <em>may</em> use {@link IuTaskController} to synchronize
* task execution.
* </p>
*
* @param task task, will be provided a {@link IuTaskController} for
* synchronizing task execution.
* @return {@link IuTaskController}
* @throws TimeoutException if the workload expiration timeout is reached
* before a worker thread is available
* @throws InterruptedException if the current thread is interrupted before a
* worker thread picks up the task
*/
@Override
public IuTaskController apply(UnsafeConsumer<IuTaskController> task) throws InterruptedException, TimeoutException {
synchronized (this) {
IuObject.waitFor(this, () -> closed || pending < size, expires, this::createTimeoutException);
if (closed) {
final var closedException = new RejectedExecutionException("Closed " + this);
if (severeFailure != null)
closedException.initCause(severeFailure);
throw closedException;
}
pending++;
this.notifyAll();
}
ParallelTaskController taskController = new ParallelTaskController(expires);
exec.submit(new Runnable() {
private final String descr = task.toString();
{ // for coverage
toString();
}
@Override
public void run() {
if (log.isLoggable(Level.FINE)) {
Integer use = usageCount.get();
if (use == null)
usageCount.set(1);
else {
if (++use % 10_000 == 0)
log.fine("used " + use + " times");
usageCount.set(use);
}
}
try {
taskController.accept(() -> {
log.finer(() -> "start " + descr);
task.accept(taskController);
log.finer(() -> "end " + descr + " " + taskController.getElapsed());
synchronized (IuParallelWorkloadController.this) {
pending--;
completed++;
IuParallelWorkloadController.this.notifyAll();
}
});
} catch (Throwable e) {
log.log(Level.INFO, e, () -> "fail " + descr + " " + taskController.getElapsed());
synchronized (IuParallelWorkloadController.this) {
failedExecutionHandler.accept(e);
pending--;
completed++;
IuParallelWorkloadController.this.notifyAll();
}
}
}
@Override
public String toString() {
return IuParallelWorkloadController.this.toString() + " " + descr;
}
});
return taskController;
}
/**
* Determines whether or not this controller is closed.
*
* @return true if closed; false if still available for accepting new tasks
*/
public boolean isClosed() {
return closed;
}
/**
* Shuts down all activity and releases resources related to the workload.
*
* <p>
* This method is invoked from a timer when the controller {@link #isExpired()
* expires}. No more tasks can be submitted once the controller is closed.
* Repeat calls to this method have no effect.
* </p>
*/
@Override
public void close() throws InterruptedException, TimeoutException {
final ExecutorService exec;
final ThreadGroup threadGroup;
final Logger log;
final Timer closeTimer;
synchronized (this) {
if (closed)
return;
log = this.log;
this.log = LOG;
log.fine("Close requested");
exec = this.exec;
threadGroup = this.threadGroup;
closeTimer = this.closeTimer;
this.exec = null;
this.threadGroup = null;
this.closeTimer = null;
this.closed = true;
closeTimer.cancel();
this.notifyAll();
}
log.fine(() -> "Close reserved, pending = " + pending);
final var endOfGracefulShutdown = Instant.now().plus(gracefulShutdown);
while (pending > 0) {
var now = Instant.now();
if (now.isBefore(endOfGracefulShutdown)) {
final var waitFor = Duration.between(now, endOfGracefulShutdown);
synchronized (this) {
this.wait(waitFor.toMillis(), waitFor.toNanosPart() % 1_000_000);
}
} else
break;
}
if (pending > 0)
// we are no longer tracking pending at this point, but not a WARNING
// yet since thread pool shutdown should terminate abandoned resources
log.info(() -> "Graceful shutdown timed out after " + gracefulShutdown);
else
log.fine("Graceful shutdown complete");
try {
log.fine("Executor shutdown requested");
exec.shutdown();
var termination = getRemaining();
if (gracefulTermination.compareTo(termination) > 0)
termination = gracefulTermination;
if (!exec.awaitTermination(termination.toNanos(), TimeUnit.NANOSECONDS)) {
log.info("Thread pool failed to terminate gracefully after " + termination + ", interrupting");
threadGroup.interrupt();
if (exec.awaitTermination(gracefulDestroy.toNanos(), TimeUnit.NANOSECONDS))
log.info("Terminated gracefully after interrupt");
else
throw new TimeoutException(
"Graceful thread termination timed out after " + termination.plus(gracefulDestroy) + ", "
+ threadGroup.activeCount() + " still active after interrupt");
} else
log.fine("Terminated gracefully");
log.fine("Executor shutdown complete");
} finally {
log.fine("Closed " + this);
}
}
@Override
public String toString() {
final var sb = new StringBuilder("[IuParallelWorkloadController ");
if (!closed)
sb.append(getElapsed()).append(' ');
sb.append('(');
sb.append(pending);
sb.append('/').append(spawned);
sb.append(" of ").append(size);
sb.append(" -> ").append(completed);
sb.append(") ");
if (isExpired())
sb.append("expired ");
else
sb.append("expires ");
sb.append(timeout);
if (closed)
sb.append(" closed");
else
sb.append("+").append(gracefulShutdown);
if (threadGroup != null)
sb.append(" threadGroup: ").append(threadGroup.getName());
sb.append("]");
return sb.toString();
}
private TimeoutException createTimeoutException() {
StringBuilder sb = new StringBuilder("Timed out in ");
sb.append(Duration.between(start, expires));
sb.append(" after completing ");
final var completed = this.completed;
sb.append(completed);
sb.append(" task");
if (completed != 1)
sb.append('s');
final var pending = this.pending;
sb.append(", ").append(pending).append(" task");
if (pending == 1)
sb.append(" remaining");
else
sb.append("s remain");
return new TimeoutException(sb.toString());
}
}