async run()

in common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts [426:499]


  async run() {
    let client: ClientWrapper | null = null;
    let updateTopology: boolean = false;
    const startTime: number = Date.now();
    logger.debug(Messages.get("HostMonitor.startMonitoring", this.hostInfo.hostId));
    try {
      while (!this.monitor.hostMonitorsStop) {
        if (!client) {
          try {
            client = await this.monitor.pluginService.forceConnect(this.hostInfo, this.monitor.monitoringProperties);
            this.monitor.pluginService.setAvailability(this.hostInfo.allAliases, HostAvailability.AVAILABLE);
          } catch (error) {
            this.monitor.pluginService.setAvailability(this.hostInfo.allAliases, HostAvailability.NOT_AVAILABLE);
          }
        }

        if (client) {
          let writerId = null;
          try {
            writerId = await this.monitor.getWriterHostIdIfConnected(client, this.hostInfo.hostId);
          } catch (error) {
            logger.error(Messages.get("ClusterTopologyMonitor.invalidWriterQuery", error?.message));
            await this.monitor.closeConnection(client);
            client = null;
          }

          if (writerId) {
            // First connection after failover may be stale.
            if ((await this.monitor.pluginService.getHostRole(client)) !== HostRole.WRITER) {
              logger.debug(Messages.get("HostMonitor.writerIsStale", writerId));
              writerId = null;
            }
          }

          if (writerId) {
            if (this.monitor.hostMonitorsWriterClient) {
              await this.monitor.closeConnection(client);
            } else {
              logger.debug(Messages.get("HostMonitor.detectedWriter", writerId, this.hostInfo.host));
              const updatedHosts: HostInfo[] = await this.monitor.fetchTopologyAndUpdateCache(client);
              if (updatedHosts && this.monitor.hostMonitorsWriterClient === null) {
                this.monitor.hostMonitorsWriterClient = client;
                this.monitor.hostMonitorsWriterInfo = this.hostInfo;
                this.monitor.hostMonitorsStop = true;
                this.monitor.logTopology(`[hostMonitor ${this.hostInfo.hostId}] `);
              } else {
                await this.monitor.closeConnection(client);
              }
            }
            client = null;
            return;
          } else if (client) {
            // Client is a reader.
            if (!this.monitor.hostMonitorsWriterClient) {
              // While the writer hasn't been identified, reader client can update topology.
              if (updateTopology) {
                await this.readerTaskFetchTopology(client, this.writerHostInfo);
              } else if (this.monitor.hostMonitorsReaderClient === null) {
                this.monitor.hostMonitorsReaderClient = client;
                updateTopology = true;
                await this.readerTaskFetchTopology(client, this.writerHostInfo);
              }
            }
          }
        }
        await sleep(100);
      }
    } catch (error) {
      // Close the monitor.
    } finally {
      await this.monitor.closeConnection(client);
      logger.debug(Messages.get("HostMonitor.endMonitoring", this.hostInfo.hostId, (Date.now() - startTime).toString()));
    }
  }