async failover()

in common/lib/plugins/failover/writer_failover_handler.ts [72:156]


  async failover(currentTopology: HostInfo[]): Promise<WriterFailoverResult> {
    if (!currentTopology || currentTopology.length == 0) {
      logger.error(Messages.get("ClusterAwareWriterFailoverHandler.failoverCalledWithInvalidTopology"));
      return ClusterAwareWriterFailoverHandler.DEFAULT_RESULT;
    }

    const reconnectToWriterHandlerTask = new ReconnectToWriterHandlerTask(
      currentTopology,
      getWriter(currentTopology),
      this.pluginService,
      this.initialConnectionProps,
      this.reconnectionWriterIntervalMs,
      Date.now() + this.maxFailoverTimeoutMs
    );

    const waitForNewWriterHandlerTask = new WaitForNewWriterHandlerTask(
      currentTopology,
      getWriter(currentTopology),
      this.readerFailoverHandler,
      this.pluginService,
      this.initialConnectionProps,
      this.readTopologyIntervalMs,
      Date.now() + this.maxFailoverTimeoutMs
    );

    let timeoutId: any;
    const timeoutTask: Promise<void> = new Promise((resolve, reject) => {
      timeoutId = setTimeout(() => {
        reject("Connection attempt task timed out.");
      }, this.maxFailoverTimeoutMs);
    });

    const taskA = reconnectToWriterHandlerTask.call();
    const taskB = waitForNewWriterHandlerTask.call();

    let failed = false;
    let selectedTask = "";
    const singleTask: boolean = this.pluginService.getDialect().getFailoverRestrictions().includes(FailoverRestriction.DISABLE_TASK_A);
    const failoverTaskPromise = singleTask ? taskB : Promise.any([taskA, taskB]);

    const failoverTask = failoverTaskPromise
      .then((result) => {
        selectedTask = result.taskName;
        // If the first resolved promise is connected or has an error, return it.
        if (result.isConnected || result.error || singleTask) {
          return result;
        }

        // Return the other task result.
        if (selectedTask === ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK) {
          selectedTask = ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK;
          return taskB;
        } else if (selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
          selectedTask = ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK;
          return taskA;
        }
        return ClusterAwareWriterFailoverHandler.DEFAULT_RESULT;
      })
      .catch((error) => {
        return new WriterFailoverResult(false, false, [], "None", null, error);
      });

    return await Promise.race([timeoutTask, failoverTask])
      .then((result) => {
        if (result && result.isConnected) {
          this.logTaskSuccess(result);
          return result;
        }
        failed = true;
        throw new AwsWrapperError("Connection attempt task timed out.");
      })
      .catch((error: any) => {
        logger.info(Messages.get("ClusterAwareWriterFailoverHandler.failedToConnectToWriterInstance"));
        failed = true;
        if (JSON.stringify(error).includes("Connection attempt task timed out.")) {
          return new WriterFailoverResult(false, false, [], "None", null);
        }
        throw error;
      })
      .finally(async () => {
        await reconnectToWriterHandlerTask.cancel(failed, selectedTask);
        await waitForNewWriterHandlerTask.cancel(selectedTask);
        clearTimeout(timeoutId);
      });
  }