in common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts [276:393]
async run(): Promise<void> {
logger.debug(Messages.get("ClusterTopologyMonitor.startMonitoring"));
try {
while (!this.stopMonitoring) {
if (this.isInPanicMode()) {
// Panic Mode: high refresh rate in effect.
if (this.hostMonitors.size === 0) {
// Initialize host tasks.
logger.debug(Messages.get("ClusterTopologyMonitor.startingHostMonitors"));
this.hostMonitorsStop = false;
if (this.hostMonitorsReaderClient !== null) {
await this.closeConnection(this.hostMonitorsReaderClient);
}
if (this.hostMonitorsWriterClient !== null) {
await this.closeConnection(this.hostMonitorsWriterClient);
}
this.hostMonitorsWriterClient = null;
this.hostMonitorsReaderClient = null;
this.hostMonitorsWriterInfo = null;
this.hostMonitorsLatestTopology = [];
// Use any client to gather topology information.
let hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
if (!hosts) {
hosts = await this.openAnyClientAndUpdateTopology();
}
// Set up host monitors.
if (hosts && !this.isVerifiedWriterConnection) {
for (const hostInfo of hosts) {
if (!this.hostMonitors.has(hostInfo.host)) {
const hostMonitor = new HostMonitor(this, hostInfo, this.writerHostInfo);
const hostRun = hostMonitor.run();
this.hostMonitors.set(hostInfo.host, hostMonitor);
this.untrackedPromises.push(hostRun);
}
}
}
// If topology is not correctly updated, will try on the next round.
} else {
// Host monitors already running, check if a writer has been detected.
const writerClient = this.hostMonitorsWriterClient;
const writerHostInfo = this.hostMonitorsWriterInfo;
if (writerClient && writerHostInfo && writerHostInfo !== this.writerHostInfo) {
// Writer detected, update monitoringClient.
logger.info(Messages.get("ClusterTopologyMonitor.writerPickedUpFromHostMonitors", writerHostInfo.hostId));
await this.updateMonitoringClient(writerClient);
this.writerHostInfo = writerHostInfo;
this.isVerifiedWriterConnection = true;
if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
} else {
this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs;
}
if (this.highRefreshRateEndTimeMs === -1) {
this.highRefreshRateEndTimeMs = 0;
} else {
this.highRefreshRateEndTimeMs = Date.now() + this.highRefreshPeriodAfterPanicMs;
}
// Stop monitoring of each host, writer detected.
this.hostMonitorsStop = true;
this.hostMonitors.clear();
continue;
} else {
// No writer detected, update host monitors with any new hosts in the topology.
const hosts: HostInfo[] = this.hostMonitorsLatestTopology;
if (hosts !== null && !this.hostMonitorsStop) {
for (const hostInfo of hosts) {
if (!this.hostMonitors.has(hostInfo.host)) {
const hostMonitor = new HostMonitor(this, hostInfo, this.writerHostInfo);
const hostRun = hostMonitor.run();
this.hostMonitors.set(hostInfo.host, hostMonitor);
this.untrackedPromises.push(hostRun);
}
}
}
}
}
// Trigger a delay before retrying.
await this.delay(true);
} else {
// Regular mode: lower refresh rate than panic mode.
if (this.hostMonitors.size !== 0) {
// Stop host monitors.
this.hostMonitorsStop = true;
this.hostMonitors.clear();
}
const hosts = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (hosts === null) {
// Unable to gather topology, switch to panic mode.
this.isVerifiedWriterConnection = false;
await this.updateMonitoringClient(null);
continue;
}
if (this.highRefreshRateEndTimeMs > 0 && Date.now() > this.highRefreshRateEndTimeMs) {
this.highRefreshRateEndTimeMs = 0;
}
if (this.highRefreshRateEndTimeMs < 0) {
// Log topology when not in high refresh rate.
this.logTopology(`[clusterTopologyMonitor] `);
}
// Set an easily interruptible delay between topology refreshes.
await this.delay(false);
}
if (this.ignoreNewTopologyRequestsEndTimeMs > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTimeMs) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
}
}
} catch (error) {
logger.error(Messages.get("ClusterTopologyMonitor.errorDuringMonitoring", error?.message));
} finally {
this.stopMonitoring = true;
await this.updateMonitoringClient(null);
logger.debug(Messages.get("ClusterTopologyMonitor.endMonitoring"));
}
}