IuCommonDataSource.java

/*
 * Copyright © 2024 Indiana University
 * All rights reserved.
 *
 * BSD 3-Clause License
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 * 
 * - Redistributions of source code must retain the above copyright notice, this
 *   list of conditions and the following disclaimer.
 * 
 * - Redistributions in binary form must reproduce the above copyright notice,
 *   this list of conditions and the following disclaimer in the documentation
 *   and/or other materials provided with the distribution.
 * 
 * - Neither the name of the copyright holder nor the names of its
 *   contributors may be used to endorse or promote products derived from
 *   this software without specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
package edu.iu.jdbc.pool;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.time.Duration;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

import javax.sql.CommonDataSource;
import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
import javax.sql.DataSource;
import javax.sql.PooledConnection;
import javax.sql.XADataSource;

import edu.iu.IuException;
import edu.iu.IuObject;
import edu.iu.IuUtilityTaskController;
import edu.iu.UnsafeFunction;
import edu.iu.UnsafeRunnable;
import edu.iu.UnsafeSupplier;

/**
 * Abstract common database connection pool implementation.
 * 
 * <p>
 * May be overridden to implement {@link DataSource} or {@link XADataSource} and
 * integrate with an application runtime environment.
 * </p>
 * 
 * @see #getPooledConnection()
 */
public abstract class IuCommonDataSource implements CommonDataSource, ConnectionEventListener, AutoCloseable {

	static {
		Logger.getLogger(IuCommonDataSource.class.getPackageName());
	}
	private static final Logger LOG = Logger.getLogger(IuCommonDataSource.class.getName());

	private final Queue<IuPooledConnection> openConnections = new ConcurrentLinkedQueue<>();
	private final Queue<IuPooledConnection> reusableConnections = new ConcurrentLinkedQueue<>();
	private final UnsafeSupplier<? extends PooledConnection> factory;

	private int loginTimeout = 15;
	private String url;
	private String username;
	private String schema;
	private int maxSize = 16;
	private int maxRetry = 1;
	private long maxConnectionReuseCount = 100;
	private Duration maxConnectionReuseTime = Duration.ofMinutes(15L);
	private Duration abandonedConnectionTimeout = Duration.ofMinutes(30L);
	private Duration shutdownTimeout = Duration.ofSeconds(30L);

	private String validationQuery;
	private Duration validationInterval = Duration.ofSeconds(15L);
	private UnsafeFunction<Connection, Connection> connectionInitializer;
	private UnsafeRunnable onClose;

	private boolean closed;
	private volatile int pendingConnections;

	/**
	 * Default constructor.
	 * 
	 * @param factory {@link UnsafeSupplier} of downstream {@link PooledConnection}
	 *                instances; each {@link UnsafeSupplier#get()} invocation
	 *                <em>must</em> return a newly established physical database
	 *                connection.
	 */
	protected IuCommonDataSource(UnsafeSupplier<? extends PooledConnection> factory) {
		this.factory = factory;
	}

	/**
	 * Checks out a {@link PooledConnection}.
	 * 
	 * <img src="doc-files/IuCommonDataSource.svg" alt="UML Communication Diagram">
	 * 
	 * <p>
	 * <strong>Implementation Note:</strong> The upstream {@link DataSource}
	 * implementation should discard this instance once the logical
	 * {@link Connection} view has been obtained. Application code will invoke
	 * {@link Connection#close()} to return the connection to the pool to be reused
	 * or retired. Note that invoking {@link PooledConnection#close()} <em>will</em>
	 * close the physical connection and remove it from the pool. This facilitates
	 * ejecting physical connections by an upstream pool manager.
	 * </p>
	 * 
	 * @return {@link PooledConnection}
	 * @throws SQLException if the connection fails due to a database error
	 */
	public IuPooledConnection getPooledConnection() throws SQLException {
		IuPooledConnection iuPooledConnection = null;
		Instant timeout = Instant.now().plusSeconds(loginTimeout);

		var attempt = 0;
		Throwable error = null;
		while (!closed //
				&& attempt <= maxRetry //
				&& timeout.isAfter(Instant.now()))
			try {
				attempt++;

				synchronized (this) {
					IuObject.waitFor(this, () -> closed //
							|| !reusableConnections.isEmpty() //
							|| !this.isExhausted(), timeout);

					pendingConnections++;
				}

				try {
					while (!reusableConnections.isEmpty()) {
						final var reusableConnection = reusableConnections.poll();

						final var timeSinceInit = Duration.between(reusableConnection.getConnectionInitiated(),
								Instant.now());
						if (timeSinceInit.compareTo(maxConnectionReuseTime) >= 0) {
							reusableConnection.close();
							LOG.fine(() -> "jdbc-pool-retire-timeout:" + timeSinceInit + ' ' + reusableConnection + ' '
									+ this);
							continue;
						}

						iuPooledConnection = reusableConnection;
						LOG.finer(() -> "jdbc-pool-reuse; " + reusableConnection + ' ' + this);
						break;
					}

					if (iuPooledConnection == null)
						iuPooledConnection = openConnection(timeout);

					final var lastUsed = iuPooledConnection.getLastTransactionSegmentEnded();
					if (validationQuery != null //
							&& (lastUsed == null //
									|| Duration.between(lastUsed, Instant.now()).compareTo(validationInterval) >= 0))
						iuPooledConnection.validate(validationQuery);

					if (error != null)
						LOG.log(Level.INFO, error, () -> "jdbc-pool-recoverable; " + this);

					return iuPooledConnection;

				} finally {
					synchronized (this) {
						pendingConnections--;
						this.notifyAll();
					}
				}

			} catch (Throwable e) {
				if (iuPooledConnection != null) {
					IuException.suppress(e, iuPooledConnection::close);
					iuPooledConnection = null;
				}

				if (error == null)
					error = e;
				else
					error.addSuppressed(e);
			}

		throw new SQLException("jdbc-pool-fail: attempt=" + attempt + ", timeout=" + timeout + "; " + this, error);
	}

	@Override
	public void connectionClosed(ConnectionEvent event) {
		final var reusableConnection = (IuPooledConnection) event.getSource();

		final var count = reusableConnection.getTransactionSegmentCount();
		if (count >= maxConnectionReuseCount) {
			try {
				reusableConnection.close();
				LOG.fine(() -> "jdbc-pool-retire-count:" + count + ' ' + reusableConnection + ' ' + this);
			} catch (Throwable e) {
				LOG.log(Level.INFO, e, () -> "jdbc-pool-retire-count:" + count + ' ' + reusableConnection + ' ' + this);
			}
			return;
		}

		if (!closed) {
			LOG.finer(() -> "jdbc-pool-reusable; " + reusableConnection);
			reusableConnections.offer(reusableConnection);
			synchronized (this) {
				this.notifyAll();
			}
		}
	}

	@Override
	public void connectionErrorOccurred(ConnectionEvent event) {
		reusableConnections.remove((IuPooledConnection) event.getSource());
	}

	@Override
	public Logger getParentLogger() {
		return LogManager.getLogManager().getLogger(IuCommonDataSource.class.getPackageName());
	}

	@Override
	public PrintWriter getLogWriter() throws SQLException {
		return null;
	}

	@Override
	public void setLogWriter(PrintWriter out) throws SQLException {
		throw new SQLFeatureNotSupportedException();
	}

	@Override
	public void setLoginTimeout(int seconds) throws SQLException {
		if (seconds < 0)
			throw new IllegalArgumentException();
		else if (seconds == 0)
			loginTimeout = 15;
		else
			loginTimeout = seconds;
	}

	@Override
	public int getLoginTimeout() {
		return loginTimeout;
	}

	/**
	 * Determines whether or not this database pool is closed.
	 * 
	 * @return true if closed; else false
	 */
	public boolean isClosed() {
		return closed;
	}

	/**
	 * Gets the number of open connections immediately available for reuse.
	 * 
	 * @return number of open connections immediately available for reuse
	 */
	public int getAvailable() {
		return reusableConnections.size();
	}

	/**
	 * Gets a count of all open connections in the pool.
	 * 
	 * @return count of all open connections
	 */
	public int getOpen() {
		return openConnections.size();
	}

	/**
	 * Gets the URL used to initialize the downstream connection factory.
	 * 
	 * @return Full JDBC URL
	 */
	public String getUrl() {
		return url;
	}

	/**
	 * Sets the URL used to initialize the downstream connection factory.
	 * 
	 * @param url Full JDBC URL
	 */
	public void setUrl(String url) {
		this.url = url;
	}

	/**
	 * Gets the database username used to initialize the downstream connection
	 * factory.
	 * 
	 * @return Database username
	 */
	public String getUsername() {
		return username;
	}

	/**
	 * Sets the database username used to initialize the downstream connection
	 * factory.
	 * 
	 * @param username Database username
	 */
	public void setUsername(String username) {
		this.username = username;
	}

	/**
	 * Gets the database schema used to initialize the downstream connection
	 * factory.
	 * 
	 * @return Database schema
	 */
	public String getSchema() {
		return schema;
	}

	/**
	 * Sets the database schema used to initialize the downstream connection
	 * factory.
	 * 
	 * @param schema Database schema
	 */
	public void setSchema(String schema) {
		this.schema = schema;
	}

	/**
	 * Gets the maximum number of connections to allow in the pool.
	 * 
	 * @return Pool max size
	 */
	public int getMaxSize() {
		return maxSize;
	}

	/**
	 * Sets the maximum number of connections to allow in the pool.
	 * 
	 * @param maxSize Pool max size
	 */
	public void setMaxSize(int maxSize) {
		this.maxSize = maxSize;
	}

	/**
	 * Gets the maximum number of times a connection attempt will be retried before
	 * resulting in failure.
	 * 
	 * @return maximum number of times a connection attempt will be retried before
	 *         resulting in failure.
	 */
	public int getMaxRetry() {
		return maxRetry;
	}

	/**
	 * Gets the maximum number of times a connection attempt will be retried before
	 * resulting in failure.
	 * 
	 * @param maxRetry maximum number of times a connection attempt will be retried
	 *                 before resulting in failure.
	 */
	public void setMaxRetry(int maxRetry) {
		this.maxRetry = maxRetry;
	}

	/**
	 * Gets the maximum number of times a single connection can be used before
	 * ejecting from the pool.
	 * 
	 * @return Per-connection max reuse count
	 */
	public long getMaxConnectionReuseCount() {
		return maxConnectionReuseCount;
	}

	/**
	 * Sets the maximum number of times a single connection can be used before
	 * ejecting from the pool.
	 * 
	 * @param maxConnectionReuseCount Per-connection max reuse count
	 */
	public void setMaxConnectionReuseCount(long maxConnectionReuseCount) {
		this.maxConnectionReuseCount = maxConnectionReuseCount;
	}

	/**
	 * Gets the maximum length of time a single connection can remain open before
	 * ejecting from the pool.
	 * 
	 * @return Per-connection max reuse time
	 */
	public Duration getMaxConnectionReuseTime() {
		return maxConnectionReuseTime;
	}

	/**
	 * Gets the maximum length of time a single connection can remain open before
	 * ejecting from the pool.
	 * 
	 * @param maxConnectionReuseTime Per-connection max reuse time
	 */
	public void setMaxConnectionReuseTime(Duration maxConnectionReuseTime) {
		this.maxConnectionReuseTime = maxConnectionReuseTime;
	}

	/**
	 * Gets the maximum length of time a connection can be checked out from the pool
	 * before attempting to forcibly close and consider it abandoned.
	 * 
	 * @return Abandoned connection timeout interval
	 */
	public Duration getAbandonedConnectionTimeout() {
		return abandonedConnectionTimeout;
	}

	/**
	 * Sets the maximum length of time a connection can be checked out from the pool
	 * before attempting to forcibly close and consider it abandoned.
	 * 
	 * @param abandonedConnectionTimeout Abandoned connection timeout interval
	 */
	public void setAbandonedConnectionTimeout(Duration abandonedConnectionTimeout) {
		this.abandonedConnectionTimeout = abandonedConnectionTimeout;
	}

	/**
	 * Gets the maximum length of time to wait for all connections to close on
	 * shutdown.
	 * 
	 * @return Maximum length of time to wait for all connections to close
	 *         gracefully
	 */
	public Duration getShutdownTimeout() {
		return shutdownTimeout;
	}

	/**
	 * Sets the maximum length of time to wait for all connections to close on
	 * shutdown.
	 * 
	 * @param shutdownTimeout Maximum length of time to wait for all connections to
	 *                        close gracefully
	 */
	protected void setShutdownTimeout(Duration shutdownTimeout) {
		this.shutdownTimeout = shutdownTimeout;
	}

	/**
	 * Gets the query to use for validating connections on creation, and
	 * intermittently before checking out from the pool.
	 * 
	 * @return SQL select statement, <em>must</em> return a single row with a single
	 *         non-null column; may be null to skip query validation
	 */
	public String getValidationQuery() {
		return validationQuery;
	}

	/**
	 * Sets the query to use for validating connections on creation, and
	 * intermittently before checking out from the pool.
	 * 
	 * @param validationQuery SQL select statement, <em>must</em> return a single
	 *                        row with a single non-null column; may be null to skip
	 *                        query validation
	 */
	public void setValidationQuery(String validationQuery) {
		this.validationQuery = validationQuery;
	}

	/**
	 * Gets the frequency at which to validate connections, when
	 * {@link #getValidationQuery()} returns a non-null value.
	 *
	 * @return Frequency at which to validate connections; may be
	 */
	public Duration getValidationInterval() {
		return validationInterval;
	}

	/**
	 * Sets the frequency at which to validate connections, when
	 * {@link #getValidationQuery()} returns a non-null value.
	 *
	 * @param validationInterval Frequency at which to validate connections; may be
	 */
	public void setValidationInterval(Duration validationInterval) {
		this.validationInterval = validationInterval;
	}

	/**
	 * Sets an optional transform function to be apply directly before checking out
	 * a connection from the pool.
	 * 
	 * @param connectionInitializer {@link UnsafeFunction}: accepts and returns a
	 *                              {@link Connection} such that
	 *                              {@link Connection#unwrap(Class)} invoked on the
	 *                              return value delegates to the {@link Connection}
	 *                              passed as an argument; <em>should not</em> throw
	 *                              checked exceptions other than
	 *                              {@link SQLException}; <em>may</em> throw
	 *                              {@link TimeoutException} or
	 *                              {@link InterruptedException}.
	 */
	public void setConnectionInitializer(UnsafeFunction<Connection, Connection> connectionInitializer) {
		this.connectionInitializer = connectionInitializer;
	}

	/**
	 * Sets an optional shutdown hook to be invoked from {@link #close()} after all
	 * physical connections managed by the pool have been closed.
	 * 
	 * @param onClose {@link UnsafeRunnable}
	 */
	public void setOnClose(UnsafeRunnable onClose) {
		this.onClose = onClose;
	}

	/**
	 * Waits for completion and closes all open connections.
	 */
	@Override
	public synchronized void close() throws SQLException {
		if (!closed) {
			closed = true;

			class CloseStatus {
				Throwable error = null;
			}
			final var closeStatus = new CloseStatus();

			while (!reusableConnections.isEmpty())
				closeStatus.error = IuException.suppress(closeStatus.error, () -> reusableConnections.poll().close());

			IuException.suppress(closeStatus.error, () -> IuObject.waitFor(this, () -> {
				for (final var c : openConnections)
					if (c.getLogicalConnectionOpened() == null)
						closeStatus.error = IuException.suppress(closeStatus.error, () -> c.close());

				return openConnections.isEmpty();
			}, shutdownTimeout));

			if (onClose != null)
				closeStatus.error = IuException.suppress(closeStatus.error, onClose);

			closeStatus.error = IuException.suppress(closeStatus.error, () -> {
				final var size = openConnections.size();
				if (size > 0)
					throw new SQLException(
							size + " connections remaining in the pool after graceful shutdown " + shutdownTimeout);
			});

			if (closeStatus.error != null)
				throw IuException.checked(closeStatus.error, SQLException.class);
		}
	}

	@Override
	public String toString() {
		// Not using JSON-P to avoid complex dependency issues with legacy apps
		final var sb = new StringBuilder("{");
		final BiConsumer<String, Object> addValue = (n, v) -> {
			if (sb.length() > 1)
				sb.append(',');
			sb.append('\"').append(n).append("\":").append(v);
		};
		final BiConsumer<String, Object> addText = (n, t) -> {
			if (t == null)
				return;
			addValue.accept(n, '\"' + t.toString().replace("\\", "\\\\").replace("\"", "\\\"") + '\"');
		};
		addText.accept("type", getClass().getSimpleName());
		addText.accept("url", getUrl());
		addText.accept("username", getUsername());
		addText.accept("schema", getSchema());
		addValue.accept("available", getAvailable());
		addValue.accept("open", getOpen());
		addValue.accept("maxSize", getMaxSize());
		addValue.accept("maxRetry", getMaxRetry());
		addValue.accept("closed", isClosed());
		addValue.accept("maxConnectionReuseCount", getMaxConnectionReuseCount());
		addText.accept("maxConnectionReuseTime", getMaxConnectionReuseTime());
		addText.accept("abandonedConnectionTimeout", getAbandonedConnectionTimeout());
		addText.accept("validationQuery", getValidationQuery());
		addText.accept("validationInterval", getValidationInterval());
		addText.accept("shutdownTimeout", getShutdownTimeout());
		return sb.append('}').toString();
	}

	private synchronized boolean isExhausted() {
		return openConnections.size() + pendingConnections >= maxSize;
	}

	private IuPooledConnection openConnection(Instant timeout) throws SQLException {
		if (closed)
			throw new SQLException("closed");

		final var initTime = Instant.now();
		final var pooledConnection = IuException.checked(SQLException.class, () -> {
			try {
				return IuUtilityTaskController.getBefore(factory, timeout);
			} catch (TimeoutException e) {
				throw new SQLException(e);
			}
		});

		final var newConnection = new IuPooledConnection(initTime, pooledConnection, connectionInitializer,
				abandonedConnectionTimeout, this::handleClose);
		newConnection.addConnectionEventListener(this);

		openConnections.offer(newConnection);
		LOG.fine(() -> "jdbc-pool-open:" + Duration.between(initTime, Instant.now()) + ' ' + pooledConnection + ' '
				+ this);

		return newConnection;
	}

	private void handleClose(IuPooledConnection closedConnection) {
		openConnections.remove(closedConnection);
		synchronized (this) {
			this.notifyAll();
		}

		final var error = closedConnection.error();
		if (error == null)
			LOG.fine(() -> "jdbc-pool-close:" + Duration.between(closedConnection.getConnectionInitiated(), Instant.now())
					+ ' ' + closedConnection + ' ' + this);
		else
			LOG.log(Level.WARNING, error,
					() -> "jdbc-pool-close:" + Duration.between(closedConnection.getConnectionInitiated(), Instant.now())
							+ ' ' + closedConnection + ' ' + this);
	}

}