public void run()

in wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java [142:301]


  public void run() {

    LOGGER.finest(() -> Messages.get(
        "MonitorImpl.startMonitoringThread",
        new Object[]{this.hostSpec.getHost()}));

    try {
      this.stopped = false;
      while (true) {
        try {

          // process new contexts
          MonitorConnectionContext newMonitorContext;
          MonitorConnectionContext firstAddedNewMonitorContext = null;
          final long currentTimeNano = this.getCurrentTimeNano();

          while ((newMonitorContext = this.newContexts.poll()) != null) {
            if (firstAddedNewMonitorContext == newMonitorContext) {
              // This context has already been processed.
              // Add it back to the queue and process it in the next round.
              this.newContexts.add(newMonitorContext);
              break;
            }
            if (newMonitorContext.isActiveContext()) {
              if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
                // The context active monitoring time hasn't come.
                // Add the context to the queue and check it later.
                this.newContexts.add(newMonitorContext);
                if (firstAddedNewMonitorContext == null) {
                  firstAddedNewMonitorContext = newMonitorContext;
                }
              } else {
                // It's time to start actively monitor this context.
                this.activeContexts.add(newMonitorContext);
              }
            }
          }

          if (!this.activeContexts.isEmpty()
              || this.monitoringConn == null
              || this.monitoringConn.isClosed()) {

            final long statusCheckStartTimeNano = this.getCurrentTimeNano();
            this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

            final ConnectionStatus status = checkConnectionStatus(this.nodeCheckTimeoutMillis);

            long delayMillis = -1;
            MonitorConnectionContext monitorContext;
            MonitorConnectionContext firstAddedMonitorContext = null;

            while ((monitorContext = this.activeContexts.poll()) != null) {

              monitorContext.getLock().lock();
              try {
                // If context is already invalid, just skip it
                if (!monitorContext.isActiveContext()) {
                  continue;
                }

                if (firstAddedMonitorContext == monitorContext) {
                  // this context has already been processed by this loop
                  // add it to the queue and exit this loop
                  this.activeContexts.add(monitorContext);
                  break;
                }

                // otherwise, process this context
                monitorContext.updateConnectionStatus(
                    this.hostSpec.getUrl(),
                    statusCheckStartTimeNano,
                    statusCheckStartTimeNano + status.elapsedTimeNano,
                    status.isValid);

                // If context is still valid and node is still healthy, it needs to continue updating this context
                if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
                  this.activeContexts.add(monitorContext);
                  if (firstAddedMonitorContext == null) {
                    firstAddedMonitorContext = monitorContext;
                  }

                  if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
                    delayMillis = monitorContext.getFailureDetectionIntervalMillis();
                  }
                }
              } finally {
                monitorContext.getLock().unlock();
              }
            }

            if (delayMillis == -1) {
              // No active contexts
              delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
            } else {
              delayMillis -= TimeUnit.NANOSECONDS.toMillis(status.elapsedTimeNano);
              // Check for min delay between node health check
              if (delayMillis <= MIN_CONNECTION_CHECK_TIMEOUT_MILLIS) {
                delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
              }
              // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
              this.nodeCheckTimeoutMillis = delayMillis;
            }

            this.sleep(delayMillis);

          } else {
            if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
                >= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
              threadContainer.releaseResource(this);
              break;
            }
            this.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
          }

        } catch (final InterruptedException intEx) {
          throw intEx;
        } catch (final Exception ex) {
          // log and ignore
          if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(
                Level.FINEST,
                Messages.get(
                   "MonitorImpl.exceptionDuringMonitoringContinue",
                    new Object[]{this.hostSpec.getHost()}),
                ex); // We want to print full trace stack of the exception.
          }
        }
      }
    } catch (final InterruptedException intEx) {
      // exit thread
      LOGGER.warning(
          () -> Messages.get(
              "MonitorImpl.interruptedExceptionDuringMonitoring",
              new Object[] {this.hostSpec.getHost()}));
    } catch (final Exception ex) {
      // this should not be reached; log and exit thread
      if (LOGGER.isLoggable(Level.FINEST)) {
        LOGGER.log(
            Level.FINEST,
            Messages.get(
                "MonitorImpl.exceptionDuringMonitoringStop",
                new Object[]{this.hostSpec.getHost()}),
            ex); // We want to print full trace stack of the exception.
      }
    } finally {
      threadContainer.releaseResource(this);
      this.stopped = true;
      if (this.monitoringConn != null) {
        try {
          this.monitoringConn.close();
        } catch (final SQLException ex) {
          // ignore
        }
      }
    }

    LOGGER.finest(() -> Messages.get(
        "MonitorImpl.stopMonitoringThread",
        new Object[]{this.hostSpec.getHost()}));
  }