in wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java [287:442]
public void run() {
try {
LOGGER.finest(() -> Messages.get(
"ClusterTopologyMonitorImpl.startMonitoringThread",
new Object[]{this.initialHostSpec.getHost()}));
while (!this.stop.get()) {
if (this.isInPanicMode()) {
if (this.submittedNodes.isEmpty()) {
LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads"));
// start node threads
this.nodeThreadsStop.set(false);
this.nodeThreadsWriterConnection.set(null);
this.nodeThreadsReaderConnection.set(null);
this.nodeThreadsWriterHostSpec.set(null);
this.nodeThreadsLatestTopology.set(null);
List<HostSpec> hosts = this.topologyMap.get(this.clusterId);
if (hosts == null) {
// need any connection to get topology
hosts = this.openAnyConnectionAndUpdateTopology();
}
this.shutdownNodeExecutorService();
this.createNodeExecutorService();
if (hosts != null && !this.isVerifiedWriterConnection) {
for (HostSpec hostSpec : hosts) {
this.submittedNodes.computeIfAbsent(hostSpec.getHost(),
(key) -> {
this.nodeExecutorService.submit(
this.getNodeMonitoringWorker(hostSpec, this.writerHostSpec.get()));
return true;
});
}
// It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
}
// otherwise let's try it again the next round
} else {
// node threads are running
// check if writer is already detected
final Connection writerConnection = this.nodeThreadsWriterConnection.get();
final HostSpec writerConnectionHostSpec = this.nodeThreadsWriterHostSpec.get();
if (writerConnection != null && writerConnectionHostSpec != null) {
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors",
new Object[]{writerConnectionHostSpec}));
this.closeConnection(this.monitoringConnection.get());
this.monitoringConnection.set(writerConnection);
this.writerHostSpec.set(writerConnectionHostSpec);
this.isVerifiedWriterConnection = true;
this.highRefreshRateEndTimeNano = System.nanoTime() + highRefreshPeriodAfterPanicNano;
// We verify the writer on initial connection and on failover, but we only want to ignore new topology
// requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
// time to 0. Any future writer verifications will set it to a positive value.
if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
}
this.nodeThreadsStop.set(true);
this.shutdownNodeExecutorService();
this.submittedNodes.clear();
continue;
} else {
// update node threads with new nodes in the topology
List<HostSpec> hosts = this.nodeThreadsLatestTopology.get();
if (hosts != null && !this.nodeThreadsStop.get()) {
for (HostSpec hostSpec : hosts) {
this.submittedNodes.computeIfAbsent(hostSpec.getHost(),
(key) -> {
this.nodeExecutorService.submit(
this.getNodeMonitoringWorker(hostSpec, this.writerHostSpec.get()));
return true;
});
}
// It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
}
}
}
this.delay(true);
} else {
// regular mode (not panic mode)
if (!this.submittedNodes.isEmpty()) {
this.shutdownNodeExecutorService();
this.submittedNodes.clear();
}
final List<HostSpec> hosts = this.fetchTopologyAndUpdateCache(this.monitoringConnection.get());
if (hosts == null) {
// can't get topology
// let's switch to panic mode
Connection conn = this.monitoringConnection.get();
this.monitoringConnection.set(null);
this.isVerifiedWriterConnection = false;
this.closeConnection(conn);
continue;
}
if (this.highRefreshRateEndTimeNano > 0 && System.nanoTime() > this.highRefreshRateEndTimeNano) {
this.highRefreshRateEndTimeNano = 0;
}
// Do not log topology while in high refresh rate. It's noisy!
if (this.highRefreshRateEndTimeNano == 0) {
LOGGER.finest(Utils.logTopology(this.topologyMap.get(this.clusterId)));
}
this.delay(false);
}
if (this.ignoreNewTopologyRequestsEndTimeNano.get() > 0
&& System.nanoTime() > this.ignoreNewTopologyRequestsEndTimeNano.get()) {
this.ignoreNewTopologyRequestsEndTimeNano.set(0);
}
}
} catch (final InterruptedException intEx) {
Thread.currentThread().interrupt();
} catch (final Exception ex) {
// this should not be reached; log and exit thread
if (LOGGER.isLoggable(Level.FINEST)) {
// We want to print full trace stack of the exception.
LOGGER.log(
Level.FINEST,
Messages.get(
"ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop",
new Object[]{this.initialHostSpec.getHost()}),
ex);
}
} finally {
this.stop.set(true);
this.shutdownNodeExecutorService();
final Connection conn = this.monitoringConnection.get();
this.monitoringConnection.set(null);
this.closeConnection(conn);
LOGGER.finest(() -> Messages.get(
"ClusterTopologyMonitorImpl.stopMonitoringThread",
new Object[]{this.initialHostSpec.getHost()}));
}
}