async executeLambda()

in src/QldbDriver.ts [158:251]


    async executeLambda<T>(
        transactionLambda: (transactionExecutor: TransactionExecutor) => Promise<T>,
        retryConfig?: RetryConfig
    ): Promise<T> {
        if (this._isClosed) {
            throw new DriverClosedError();
        }

        // Acquire semaphore and get a session from the pool
        const getSession = async function(thisDriver: QldbDriver, startNewSession: boolean): Promise<QldbSession> {
            debug(
                `Getting session. Current free session count: ${thisDriver._sessionPool.length}. ` +
                `Currently available permit count: ${thisDriver._availablePermits}.`
            );
            if (thisDriver._semaphore.tryAcquire()) {
                thisDriver._availablePermits--;
                try {
                    let session: QldbSession
                    if (!startNewSession) {
                        session = thisDriver._sessionPool.pop();
                    }
                    if (startNewSession || session == undefined) {
                        debug("Creating a new session.");
                        const communicator: Communicator = 
                            await Communicator.create(thisDriver._qldbClient, thisDriver._ledgerName);
                        session = new QldbSession(communicator);
                    }
                    return session;
                } catch (e) {
                    thisDriver._semaphore.release();
                    thisDriver._availablePermits++;
            
                    // An error when failing to start a new session is always retryable
                    throw new ExecuteError(e, true, true);
                }
            } else {
                throw new SessionPoolEmptyError()
            }
        }

        // Release semaphore and if the session is alive return it to the pool and return true
        const releaseSession = function(thisDriver: QldbDriver, session: QldbSession): boolean {
            if (session != null) {
                thisDriver._semaphore.release();
                thisDriver._availablePermits++;
                if (session.isAlive()) {
                    thisDriver._sessionPool.push(session);
                    return true;
                }
            }
            return false;
        }
        
        retryConfig = (retryConfig == null) ? this._retryConfig : retryConfig;
        let session: QldbSession;
        let startNewSession: boolean = false;
        for (let retryAttempt: number = 1; true; retryAttempt++) {
            try {
                session = null;
                session = await getSession(this, startNewSession);
                return await session.executeLambda(transactionLambda);
            } catch (e) {
                if (e instanceof ExecuteError) {
                    if (e.isRetryable) {
                        // Always retry on the first attempt if failure was caused by a stale session in the pool 
                        if (retryAttempt == 1 && e.isInvalidSessionException) {
                            debug("Initial session received from pool is invalid. Retrying...");
                            continue;
                        }
                        if (retryAttempt > retryConfig.getRetryLimit()) {
                            throw e.cause;
                        }

                        const backoffFunction: BackoffFunction = retryConfig.getBackoffFunction();
                        let backoffDelay: number = backoffFunction(retryAttempt, e.cause, e.transactionId);
                        if (backoffDelay == null || backoffDelay < 0) {
                            backoffDelay = 0;
                        }
                        await new Promise(resolve => setTimeout(resolve, backoffDelay));

                        info(`A recoverable error has occurred. Attempting retry #${retryAttempt}.`);
                        debug(`Error cause: ${e.cause}`);
                        continue;
                    } else {
                        throw e.cause;
                    }
                } else {
                    throw e;
                }
            } finally {
                startNewSession = !releaseSession(this, session);
            }
        }
    }