in common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts [426:499]
async run() {
let client: ClientWrapper | null = null;
let updateTopology: boolean = false;
const startTime: number = Date.now();
logger.debug(Messages.get("HostMonitor.startMonitoring", this.hostInfo.hostId));
try {
while (!this.monitor.hostMonitorsStop) {
if (!client) {
try {
client = await this.monitor.pluginService.forceConnect(this.hostInfo, this.monitor.monitoringProperties);
this.monitor.pluginService.setAvailability(this.hostInfo.allAliases, HostAvailability.AVAILABLE);
} catch (error) {
this.monitor.pluginService.setAvailability(this.hostInfo.allAliases, HostAvailability.NOT_AVAILABLE);
}
}
if (client) {
let writerId = null;
try {
writerId = await this.monitor.getWriterHostIdIfConnected(client, this.hostInfo.hostId);
} catch (error) {
logger.error(Messages.get("ClusterTopologyMonitor.invalidWriterQuery", error?.message));
await this.monitor.closeConnection(client);
client = null;
}
if (writerId) {
// First connection after failover may be stale.
if ((await this.monitor.pluginService.getHostRole(client)) !== HostRole.WRITER) {
logger.debug(Messages.get("HostMonitor.writerIsStale", writerId));
writerId = null;
}
}
if (writerId) {
if (this.monitor.hostMonitorsWriterClient) {
await this.monitor.closeConnection(client);
} else {
logger.debug(Messages.get("HostMonitor.detectedWriter", writerId, this.hostInfo.host));
const updatedHosts: HostInfo[] = await this.monitor.fetchTopologyAndUpdateCache(client);
if (updatedHosts && this.monitor.hostMonitorsWriterClient === null) {
this.monitor.hostMonitorsWriterClient = client;
this.monitor.hostMonitorsWriterInfo = this.hostInfo;
this.monitor.hostMonitorsStop = true;
this.monitor.logTopology(`[hostMonitor ${this.hostInfo.hostId}] `);
} else {
await this.monitor.closeConnection(client);
}
}
client = null;
return;
} else if (client) {
// Client is a reader.
if (!this.monitor.hostMonitorsWriterClient) {
// While the writer hasn't been identified, reader client can update topology.
if (updateTopology) {
await this.readerTaskFetchTopology(client, this.writerHostInfo);
} else if (this.monitor.hostMonitorsReaderClient === null) {
this.monitor.hostMonitorsReaderClient = client;
updateTopology = true;
await this.readerTaskFetchTopology(client, this.writerHostInfo);
}
}
}
}
await sleep(100);
}
} catch (error) {
// Close the monitor.
} finally {
await this.monitor.closeConnection(client);
logger.debug(Messages.get("HostMonitor.endMonitoring", this.hostInfo.hostId, (Date.now() - startTime).toString()));
}
}