in wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java [142:301]
public void run() {
LOGGER.finest(() -> Messages.get(
"MonitorImpl.startMonitoringThread",
new Object[]{this.hostSpec.getHost()}));
try {
this.stopped = false;
while (true) {
try {
// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
}
}
if (!this.activeContexts.isEmpty()
|| this.monitoringConn == null
|| this.monitoringConn.isClosed()) {
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;
final ConnectionStatus status = checkConnectionStatus(this.nodeCheckTimeoutMillis);
long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;
while ((monitorContext = this.activeContexts.poll()) != null) {
monitorContext.getLock().lock();
try {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}
if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}
// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);
// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
}
if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
} finally {
monitorContext.getLock().unlock();
}
}
if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= TimeUnit.NANOSECONDS.toMillis(status.elapsedTimeNano);
// Check for min delay between node health check
if (delayMillis <= MIN_CONNECTION_CHECK_TIMEOUT_MILLIS) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}
this.sleep(delayMillis);
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
threadContainer.releaseResource(this);
break;
}
this.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}
} catch (final InterruptedException intEx) {
throw intEx;
} catch (final Exception ex) {
// log and ignore
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(
Level.FINEST,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
}
}
} catch (final InterruptedException intEx) {
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(
Level.FINEST,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
} finally {
threadContainer.releaseResource(this);
this.stopped = true;
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
} catch (final SQLException ex) {
// ignore
}
}
}
LOGGER.finest(() -> Messages.get(
"MonitorImpl.stopMonitoringThread",
new Object[]{this.hostSpec.getHost()}));
}