async run()

in common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts [276:393]


  async run(): Promise<void> {
    logger.debug(Messages.get("ClusterTopologyMonitor.startMonitoring"));
    try {
      while (!this.stopMonitoring) {
        if (this.isInPanicMode()) {
          // Panic Mode: high refresh rate in effect.

          if (this.hostMonitors.size === 0) {
            // Initialize host tasks.
            logger.debug(Messages.get("ClusterTopologyMonitor.startingHostMonitors"));
            this.hostMonitorsStop = false;
            if (this.hostMonitorsReaderClient !== null) {
              await this.closeConnection(this.hostMonitorsReaderClient);
            }
            if (this.hostMonitorsWriterClient !== null) {
              await this.closeConnection(this.hostMonitorsWriterClient);
            }
            this.hostMonitorsWriterClient = null;
            this.hostMonitorsReaderClient = null;
            this.hostMonitorsWriterInfo = null;
            this.hostMonitorsLatestTopology = [];

            // Use any client to gather topology information.
            let hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
            if (!hosts) {
              hosts = await this.openAnyClientAndUpdateTopology();
            }

            // Set up host monitors.
            if (hosts && !this.isVerifiedWriterConnection) {
              for (const hostInfo of hosts) {
                if (!this.hostMonitors.has(hostInfo.host)) {
                  const hostMonitor = new HostMonitor(this, hostInfo, this.writerHostInfo);
                  const hostRun = hostMonitor.run();
                  this.hostMonitors.set(hostInfo.host, hostMonitor);
                  this.untrackedPromises.push(hostRun);
                }
              }
            }
            // If topology is not correctly updated, will try on the next round.
          } else {
            // Host monitors already running, check if a writer has been detected.
            const writerClient = this.hostMonitorsWriterClient;
            const writerHostInfo = this.hostMonitorsWriterInfo;
            if (writerClient && writerHostInfo && writerHostInfo !== this.writerHostInfo) {
              // Writer detected, update monitoringClient.
              logger.info(Messages.get("ClusterTopologyMonitor.writerPickedUpFromHostMonitors", writerHostInfo.hostId));
              await this.updateMonitoringClient(writerClient);
              this.writerHostInfo = writerHostInfo;
              this.isVerifiedWriterConnection = true;
              if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
                this.ignoreNewTopologyRequestsEndTimeMs = 0;
              } else {
                this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs;
              }
              if (this.highRefreshRateEndTimeMs === -1) {
                this.highRefreshRateEndTimeMs = 0;
              } else {
                this.highRefreshRateEndTimeMs = Date.now() + this.highRefreshPeriodAfterPanicMs;
              }

              // Stop monitoring of each host, writer detected.
              this.hostMonitorsStop = true;
              this.hostMonitors.clear();
              continue;
            } else {
              // No writer detected, update host monitors with any new hosts in the topology.
              const hosts: HostInfo[] = this.hostMonitorsLatestTopology;
              if (hosts !== null && !this.hostMonitorsStop) {
                for (const hostInfo of hosts) {
                  if (!this.hostMonitors.has(hostInfo.host)) {
                    const hostMonitor = new HostMonitor(this, hostInfo, this.writerHostInfo);
                    const hostRun = hostMonitor.run();
                    this.hostMonitors.set(hostInfo.host, hostMonitor);
                    this.untrackedPromises.push(hostRun);
                  }
                }
              }
            }
          }
          // Trigger a delay before retrying.
          await this.delay(true);
        } else {
          // Regular mode: lower refresh rate than panic mode.
          if (this.hostMonitors.size !== 0) {
            // Stop host monitors.
            this.hostMonitorsStop = true;
            this.hostMonitors.clear();
          }
          const hosts = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
          if (hosts === null) {
            // Unable to gather topology, switch to panic mode.
            this.isVerifiedWriterConnection = false;
            await this.updateMonitoringClient(null);
            continue;
          }
          if (this.highRefreshRateEndTimeMs > 0 && Date.now() > this.highRefreshRateEndTimeMs) {
            this.highRefreshRateEndTimeMs = 0;
          }
          if (this.highRefreshRateEndTimeMs < 0) {
            // Log topology when not in high refresh rate.
            this.logTopology(`[clusterTopologyMonitor] `);
          }
          // Set an easily interruptible delay between topology refreshes.
          await this.delay(false);
        }
        if (this.ignoreNewTopologyRequestsEndTimeMs > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTimeMs) {
          this.ignoreNewTopologyRequestsEndTimeMs = 0;
        }
      }
    } catch (error) {
      logger.error(Messages.get("ClusterTopologyMonitor.errorDuringMonitoring", error?.message));
    } finally {
      this.stopMonitoring = true;
      await this.updateMonitoringClient(null);
      logger.debug(Messages.get("ClusterTopologyMonitor.endMonitoring"));
    }
  }