in wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java [514:609]
protected void failoverWriter() throws SQLException {
TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory();
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
TELEMETRY_WRITER_FAILOVER, TelemetryTraceLevel.NESTED);
this.failoverWriterTriggeredCounter.inc();
long failoverStartTimeNano = System.nanoTime();
try {
LOGGER.info(() -> Messages.get("Failover.startWriterFailover"));
// It's expected that this method synchronously returns when topology is stabilized,
// i.e. when cluster control plane has already chosen a new writer.
if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList"));
}
final List<HostSpec> updatedHosts = this.pluginService.getAllHosts();
Connection writerCandidateConn;
final HostSpec writerCandidate = updatedHosts.stream()
.filter(x -> x.getRole() == HostRole.WRITER)
.findFirst()
.orElse(null);
if (writerCandidate == null) {
this.failoverWriterFailedCounter.inc();
String message = Utils.logTopology(updatedHosts, Messages.get("Failover.noWriterHost"));
LOGGER.severe(message);
throw new FailoverFailedSQLException(message);
}
final List<HostSpec> allowedHosts = this.pluginService.getHosts();
if (!Utils.containsUrl(allowedHosts, writerCandidate.getUrl())) {
this.failoverWriterFailedCounter.inc();
String topologyString = Utils.logTopology(allowedHosts, "");
LOGGER.severe(Messages.get("Failover.newWriterNotAllowed",
new Object[] {writerCandidate.getUrl(), topologyString}));
throw new FailoverFailedSQLException(
Messages.get("Failover.newWriterNotAllowed",
new Object[] {writerCandidate.getUrl(), topologyString}));
}
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, this.properties, this);
} catch (SQLException ex) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost()}));
throw new FailoverFailedSQLException(
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost()}), ex);
}
HostRole role = this.pluginService.getHostRole(writerCandidateConn);
if (role != HostRole.WRITER) {
try {
writerCandidateConn.close();
} catch (SQLException ex) {
// do nothing
}
this.failoverWriterFailedCounter.inc();
LOGGER.severe(
Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role}));
throw new FailoverFailedSQLException(
Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role}));
}
this.pluginService.setCurrentConnection(writerCandidateConn, writerCandidate);
LOGGER.fine(
() -> Messages.get(
"Failover.establishedConnection",
new Object[]{this.pluginService.getCurrentHostSpec()}));
throwFailoverSuccessException();
} catch (FailoverSuccessSQLException ex) {
this.failoverWriterSuccessCounter.inc();
telemetryContext.setSuccess(true);
telemetryContext.setException(ex);
throw ex;
} catch (Exception ex) {
telemetryContext.setSuccess(false);
telemetryContext.setException(ex);
this.failoverWriterFailedCounter.inc();
throw ex;
} finally {
LOGGER.finest(() -> Messages.get(
"Failover.writerFailoverElapsed",
new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - failoverStartTimeNano)}));
telemetryContext.closeContext();
if (this.telemetryFailoverAdditionalTopTraceSetting) {
telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL);
}
}
}