public void run()

in wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java [287:442]


  public void run() {
    try {
      LOGGER.finest(() -> Messages.get(
          "ClusterTopologyMonitorImpl.startMonitoringThread",
          new Object[]{this.initialHostSpec.getHost()}));

      while (!this.stop.get()) {

        if (this.isInPanicMode()) {

          if (this.submittedNodes.isEmpty()) {
            LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads"));

            // start node threads
            this.nodeThreadsStop.set(false);
            this.nodeThreadsWriterConnection.set(null);
            this.nodeThreadsReaderConnection.set(null);
            this.nodeThreadsWriterHostSpec.set(null);
            this.nodeThreadsLatestTopology.set(null);

            List<HostSpec> hosts = this.topologyMap.get(this.clusterId);
            if (hosts == null) {
              // need any connection to get topology
              hosts = this.openAnyConnectionAndUpdateTopology();
            }

            this.shutdownNodeExecutorService();
            this.createNodeExecutorService();

            if (hosts != null && !this.isVerifiedWriterConnection) {
              for (HostSpec hostSpec : hosts) {
                this.submittedNodes.computeIfAbsent(hostSpec.getHost(),
                    (key) -> {
                      this.nodeExecutorService.submit(
                          this.getNodeMonitoringWorker(hostSpec, this.writerHostSpec.get()));
                      return true;
                    });
              }
              // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
            }
            // otherwise let's try it again the next round

          } else {
            // node threads are running
            // check if writer is already detected
            final Connection writerConnection = this.nodeThreadsWriterConnection.get();
            final HostSpec writerConnectionHostSpec = this.nodeThreadsWriterHostSpec.get();
            if (writerConnection != null && writerConnectionHostSpec != null) {
              LOGGER.finest(
                  Messages.get(
                      "ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors",
                      new Object[]{writerConnectionHostSpec}));

              this.closeConnection(this.monitoringConnection.get());
              this.monitoringConnection.set(writerConnection);
              this.writerHostSpec.set(writerConnectionHostSpec);
              this.isVerifiedWriterConnection = true;
              this.highRefreshRateEndTimeNano = System.nanoTime() + highRefreshPeriodAfterPanicNano;

              // We verify the writer on initial connection and on failover, but we only want to ignore new topology
              // requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
              // time to 0. Any future writer verifications will set it to a positive value.
              if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
                this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
              }

              this.nodeThreadsStop.set(true);
              this.shutdownNodeExecutorService();
              this.submittedNodes.clear();

              continue;

            } else {
              // update node threads with new nodes in the topology
              List<HostSpec> hosts = this.nodeThreadsLatestTopology.get();
              if (hosts != null && !this.nodeThreadsStop.get()) {
                for (HostSpec hostSpec : hosts) {
                  this.submittedNodes.computeIfAbsent(hostSpec.getHost(),
                      (key) -> {
                        this.nodeExecutorService.submit(
                            this.getNodeMonitoringWorker(hostSpec, this.writerHostSpec.get()));
                        return true;
                      });
                }
                // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
              }
            }
          }

          this.delay(true);

        } else {
          // regular mode (not panic mode)

          if (!this.submittedNodes.isEmpty()) {
            this.shutdownNodeExecutorService();
            this.submittedNodes.clear();
          }

          final List<HostSpec> hosts = this.fetchTopologyAndUpdateCache(this.monitoringConnection.get());
          if (hosts == null) {
            // can't get topology
            // let's switch to panic mode
            Connection conn = this.monitoringConnection.get();
            this.monitoringConnection.set(null);
            this.isVerifiedWriterConnection = false;
            this.closeConnection(conn);
            continue;
          }

          if (this.highRefreshRateEndTimeNano > 0 && System.nanoTime() > this.highRefreshRateEndTimeNano) {
            this.highRefreshRateEndTimeNano = 0;
          }

          // Do not log topology while in high refresh rate. It's noisy!
          if (this.highRefreshRateEndTimeNano == 0) {
            LOGGER.finest(Utils.logTopology(this.topologyMap.get(this.clusterId)));
          }

          this.delay(false);
        }

        if (this.ignoreNewTopologyRequestsEndTimeNano.get() > 0
            && System.nanoTime() > this.ignoreNewTopologyRequestsEndTimeNano.get()) {
          this.ignoreNewTopologyRequestsEndTimeNano.set(0);
        }
      }

    } catch (final InterruptedException intEx) {
      Thread.currentThread().interrupt();

    } catch (final Exception ex) {
      // this should not be reached; log and exit thread
      if (LOGGER.isLoggable(Level.FINEST)) {
        // We want to print full trace stack of the exception.
        LOGGER.log(
            Level.FINEST,
            Messages.get(
                "ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop",
                new Object[]{this.initialHostSpec.getHost()}),
            ex);
      }

    } finally {
      this.stop.set(true);
      this.shutdownNodeExecutorService();

      final Connection conn = this.monitoringConnection.get();
      this.monitoringConnection.set(null);
      this.closeConnection(conn);

      LOGGER.finest(() -> Messages.get(
          "ClusterTopologyMonitorImpl.stopMonitoringThread",
          new Object[]{this.initialHostSpec.getHost()}));
    }
  }