in wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java [827:915]
public void run() {
Connection connection = null;
boolean updateTopology = false;
final long start = System.nanoTime();
try {
while (!this.monitor.nodeThreadsStop.get()) {
if (connection == null) {
try {
connection = this.monitor.pluginService.forceConnect(
hostSpec, this.monitor.monitoringProperties);
this.monitor.pluginService.setAvailability(
hostSpec.asAliases(), HostAvailability.AVAILABLE);
} catch (SQLException ex) {
// connect issues
this.monitor.pluginService.setAvailability(
hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE);
}
}
if (connection != null) {
String writerId = null;
try {
writerId = this.monitor.getWriterNodeId(connection);
} catch (SQLSyntaxErrorException ex) {
LOGGER.severe(() -> Messages.get("NodeMonitoringThread.invalidWriterQuery",
new Object[] {ex.getMessage()}));
throw new RuntimeException(ex);
} catch (SQLException ex) {
this.monitor.closeConnection(connection);
connection = null;
}
if (!StringUtils.isNullOrEmpty(writerId)) {
// this prevents closing connection in finally block
if (!this.monitor.nodeThreadsWriterConnection.compareAndSet(null, connection)) {
// writer connection is already setup
this.monitor.closeConnection(connection);
} else {
// writer connection is successfully set to writerConnection
LOGGER.fine(Messages.get("NodeMonitoringThread.detectedWriter", new Object[]{writerId}));
// When nodeThreadsWriterConnection and nodeThreadsWriterHostSpec are both set, the topology monitor may
// set ignoreNewTopologyRequestsEndTimeNano, in which case other threads will use the cached topology
// for the ignore duration, so we need to update the topology before setting nodeThreadsWriterHostSpec.
this.monitor.fetchTopologyAndUpdateCache(connection);
this.monitor.nodeThreadsWriterHostSpec.set(hostSpec);
this.monitor.nodeThreadsStop.set(true);
LOGGER.fine(Utils.logTopology(
this.monitor.topologyMap.get(this.monitor.clusterId)));
}
// Setting the connection to null here prevents the final block
// from closing nodeThreadsWriterConnection.
connection = null;
return;
} else if (connection != null) {
// this connection is a reader connection
if (this.monitor.nodeThreadsWriterConnection.get() == null) {
// while writer connection isn't yet established this reader connection may update topology
if (updateTopology) {
this.readerThreadFetchTopology(connection, writerHostSpec);
} else if (this.monitor.nodeThreadsReaderConnection.get() == null) {
if (this.monitor.nodeThreadsReaderConnection.compareAndSet(null, connection)) {
// let's use this connection to update topology
updateTopology = true;
this.readerThreadFetchTopology(connection, writerHostSpec);
}
}
}
}
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
this.monitor.closeConnection(connection);
final long end = System.nanoTime();
LOGGER.finest(() -> Messages.get("NodeMonitoringThread.threadCompleted",
new Object[] {TimeUnit.NANOSECONDS.toMillis(end - start)}));
}
}