in src/QldbDriver.ts [158:251]
async executeLambda<T>(
transactionLambda: (transactionExecutor: TransactionExecutor) => Promise<T>,
retryConfig?: RetryConfig
): Promise<T> {
if (this._isClosed) {
throw new DriverClosedError();
}
// Acquire semaphore and get a session from the pool
const getSession = async function(thisDriver: QldbDriver, startNewSession: boolean): Promise<QldbSession> {
debug(
`Getting session. Current free session count: ${thisDriver._sessionPool.length}. ` +
`Currently available permit count: ${thisDriver._availablePermits}.`
);
if (thisDriver._semaphore.tryAcquire()) {
thisDriver._availablePermits--;
try {
let session: QldbSession
if (!startNewSession) {
session = thisDriver._sessionPool.pop();
}
if (startNewSession || session == undefined) {
debug("Creating a new session.");
const communicator: Communicator =
await Communicator.create(thisDriver._qldbClient, thisDriver._ledgerName);
session = new QldbSession(communicator);
}
return session;
} catch (e) {
thisDriver._semaphore.release();
thisDriver._availablePermits++;
// An error when failing to start a new session is always retryable
throw new ExecuteError(e, true, true);
}
} else {
throw new SessionPoolEmptyError()
}
}
// Release semaphore and if the session is alive return it to the pool and return true
const releaseSession = function(thisDriver: QldbDriver, session: QldbSession): boolean {
if (session != null) {
thisDriver._semaphore.release();
thisDriver._availablePermits++;
if (session.isAlive()) {
thisDriver._sessionPool.push(session);
return true;
}
}
return false;
}
retryConfig = (retryConfig == null) ? this._retryConfig : retryConfig;
let session: QldbSession;
let startNewSession: boolean = false;
for (let retryAttempt: number = 1; true; retryAttempt++) {
try {
session = null;
session = await getSession(this, startNewSession);
return await session.executeLambda(transactionLambda);
} catch (e) {
if (e instanceof ExecuteError) {
if (e.isRetryable) {
// Always retry on the first attempt if failure was caused by a stale session in the pool
if (retryAttempt == 1 && e.isInvalidSessionException) {
debug("Initial session received from pool is invalid. Retrying...");
continue;
}
if (retryAttempt > retryConfig.getRetryLimit()) {
throw e.cause;
}
const backoffFunction: BackoffFunction = retryConfig.getBackoffFunction();
let backoffDelay: number = backoffFunction(retryAttempt, e.cause, e.transactionId);
if (backoffDelay == null || backoffDelay < 0) {
backoffDelay = 0;
}
await new Promise(resolve => setTimeout(resolve, backoffDelay));
info(`A recoverable error has occurred. Attempting retry #${retryAttempt}.`);
debug(`Error cause: ${e.cause}`);
continue;
} else {
throw e.cause;
}
} else {
throw e;
}
} finally {
startNewSession = !releaseSession(this, session);
}
}
}