in common/lib/plugins/aurora_initial_connection_strategy_plugin.ts [114:176]
async getVerifiedWriterClient<T>(props: Map<string, any>, isInitialConnection: boolean, connectFunc: () => Promise<T>): Promise<any> {
if (!this.hostListProviderService) {
throw new AwsWrapperError(Messages.get("HostListProviderService.notFound")); // should not be reached
}
const retryDelayMs = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get(props);
const endTimeMillis = Date.now() + WrapperProperties.OPEN_CONNECTION_RETRY_TIMEOUT_MS.get(props);
let writerCandidateClient: any;
let writerCandidate: HostInfo | void | null;
while (Date.now() < endTimeMillis) {
writerCandidateClient = null;
writerCandidate = null;
try {
writerCandidate = this.getWriter();
if (writerCandidate === null || this.rdsUtils.isRdsClusterDns(writerCandidate.host)) {
// Writer is not found. It seems that topology is outdated.
writerCandidateClient = await connectFunc();
await this.pluginService.forceRefreshHostList(writerCandidateClient);
writerCandidate = await this.pluginService.identifyConnection(writerCandidateClient);
if (writerCandidate) {
if (writerCandidate.role !== HostRole.WRITER) {
// Shouldn't be here. But let's try again.
await this.pluginService.abortTargetClient(writerCandidateClient);
await sleep(retryDelayMs);
continue;
}
if (isInitialConnection) {
this.hostListProviderService.setInitialConnectionHostInfo(writerCandidate);
}
}
return writerCandidateClient;
}
writerCandidateClient = await this.pluginService.connect(writerCandidate, props);
if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
// If the new connection resolves to a reader instance, this means the topology is outdated.
// Force refresh to update the topology.
await this.pluginService.forceRefreshHostList(writerCandidateClient);
await this.pluginService.abortTargetClient(writerCandidateClient);
await sleep(retryDelayMs);
continue;
}
// Writer connection is valid and verified.
if (isInitialConnection) {
this.hostListProviderService.setInitialConnectionHostInfo(writerCandidate);
}
return writerCandidateClient;
} catch (error: any) {
await this.pluginService.abortTargetClient(writerCandidateClient);
if (this.pluginService.isLoginError(error) || !writerCandidate) {
throw error;
} else if (writerCandidate) {
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.NOT_AVAILABLE);
}
}
}
}