in common/lib/plugins/failover/writer_failover_handler.ts [72:156]
async failover(currentTopology: HostInfo[]): Promise<WriterFailoverResult> {
if (!currentTopology || currentTopology.length == 0) {
logger.error(Messages.get("ClusterAwareWriterFailoverHandler.failoverCalledWithInvalidTopology"));
return ClusterAwareWriterFailoverHandler.DEFAULT_RESULT;
}
const reconnectToWriterHandlerTask = new ReconnectToWriterHandlerTask(
currentTopology,
getWriter(currentTopology),
this.pluginService,
this.initialConnectionProps,
this.reconnectionWriterIntervalMs,
Date.now() + this.maxFailoverTimeoutMs
);
const waitForNewWriterHandlerTask = new WaitForNewWriterHandlerTask(
currentTopology,
getWriter(currentTopology),
this.readerFailoverHandler,
this.pluginService,
this.initialConnectionProps,
this.readTopologyIntervalMs,
Date.now() + this.maxFailoverTimeoutMs
);
let timeoutId: any;
const timeoutTask: Promise<void> = new Promise((resolve, reject) => {
timeoutId = setTimeout(() => {
reject("Connection attempt task timed out.");
}, this.maxFailoverTimeoutMs);
});
const taskA = reconnectToWriterHandlerTask.call();
const taskB = waitForNewWriterHandlerTask.call();
let failed = false;
let selectedTask = "";
const singleTask: boolean = this.pluginService.getDialect().getFailoverRestrictions().includes(FailoverRestriction.DISABLE_TASK_A);
const failoverTaskPromise = singleTask ? taskB : Promise.any([taskA, taskB]);
const failoverTask = failoverTaskPromise
.then((result) => {
selectedTask = result.taskName;
// If the first resolved promise is connected or has an error, return it.
if (result.isConnected || result.error || singleTask) {
return result;
}
// Return the other task result.
if (selectedTask === ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK) {
selectedTask = ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK;
return taskB;
} else if (selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
selectedTask = ClusterAwareWriterFailoverHandler.RECONNECT_WRITER_TASK;
return taskA;
}
return ClusterAwareWriterFailoverHandler.DEFAULT_RESULT;
})
.catch((error) => {
return new WriterFailoverResult(false, false, [], "None", null, error);
});
return await Promise.race([timeoutTask, failoverTask])
.then((result) => {
if (result && result.isConnected) {
this.logTaskSuccess(result);
return result;
}
failed = true;
throw new AwsWrapperError("Connection attempt task timed out.");
})
.catch((error: any) => {
logger.info(Messages.get("ClusterAwareWriterFailoverHandler.failedToConnectToWriterInstance"));
failed = true;
if (JSON.stringify(error).includes("Connection attempt task timed out.")) {
return new WriterFailoverResult(false, false, [], "None", null);
}
throw error;
})
.finally(async () => {
await reconnectToWriterHandlerTask.cancel(failed, selectedTask);
await waitForNewWriterHandlerTask.cancel(selectedTask);
clearTimeout(timeoutId);
});
}