async getVerifiedWriterClient()

in common/lib/plugins/aurora_initial_connection_strategy_plugin.ts [114:176]


  async getVerifiedWriterClient<T>(props: Map<string, any>, isInitialConnection: boolean, connectFunc: () => Promise<T>): Promise<any> {
    if (!this.hostListProviderService) {
      throw new AwsWrapperError(Messages.get("HostListProviderService.notFound")); // should not be reached
    }
    const retryDelayMs = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get(props);

    const endTimeMillis = Date.now() + WrapperProperties.OPEN_CONNECTION_RETRY_TIMEOUT_MS.get(props);
    let writerCandidateClient: any;
    let writerCandidate: HostInfo | void | null;

    while (Date.now() < endTimeMillis) {
      writerCandidateClient = null;
      writerCandidate = null;

      try {
        writerCandidate = this.getWriter();

        if (writerCandidate === null || this.rdsUtils.isRdsClusterDns(writerCandidate.host)) {
          // Writer is not found. It seems that topology is outdated.
          writerCandidateClient = await connectFunc();
          await this.pluginService.forceRefreshHostList(writerCandidateClient);
          writerCandidate = await this.pluginService.identifyConnection(writerCandidateClient);

          if (writerCandidate) {
            if (writerCandidate.role !== HostRole.WRITER) {
              // Shouldn't be here. But let's try again.
              await this.pluginService.abortTargetClient(writerCandidateClient);
              await sleep(retryDelayMs);
              continue;
            }

            if (isInitialConnection) {
              this.hostListProviderService.setInitialConnectionHostInfo(writerCandidate);
            }
          }
          return writerCandidateClient;
        }
        writerCandidateClient = await this.pluginService.connect(writerCandidate, props);

        if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
          // If the new connection resolves to a reader instance, this means the topology is outdated.
          // Force refresh to update the topology.
          await this.pluginService.forceRefreshHostList(writerCandidateClient);
          await this.pluginService.abortTargetClient(writerCandidateClient);
          await sleep(retryDelayMs);
          continue;
        }

        // Writer connection is valid and verified.
        if (isInitialConnection) {
          this.hostListProviderService.setInitialConnectionHostInfo(writerCandidate);
        }
        return writerCandidateClient;
      } catch (error: any) {
        await this.pluginService.abortTargetClient(writerCandidateClient);
        if (this.pluginService.isLoginError(error) || !writerCandidate) {
          throw error;
        } else if (writerCandidate) {
          this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.NOT_AVAILABLE);
        }
      }
    }
  }