func()

in qldbdriver/qldbdriver.go [108:173]


func (driver *QLDBDriver) Execute(ctx context.Context, fn func(txn Transaction) (interface{}, error)) (interface{}, error) {
	if driver.isClosed {
		return nil, &qldbDriverError{"Cannot invoke methods on a closed QLDBDriver."}
	}

	retryAttempt := 0

	session, err := driver.getSession(ctx)
	if err != nil {
		return nil, err
	}

	var result interface{}
	var txnErr *txnError
	for {
		result, txnErr = session.execute(ctx, fn)
		if txnErr != nil {
			// If initial session is invalid, always retry once
			if txnErr.canRetry && txnErr.isISE && retryAttempt == 0 {
				driver.logger.log(LogDebug, "Initial session received from pool invalid. Retrying...")
				session, err = driver.createSession(ctx)
				if err != nil {
					return nil, err
				}
				retryAttempt++
				continue
			}
			// Do not retry
			if !txnErr.canRetry || retryAttempt >= driver.retryPolicy.MaxRetryLimit {
				if txnErr.abortSuccess {
					driver.releaseSession(session)
				} else {
					driver.semaphore.release()
				}
				return nil, txnErr.unwrap()
			}
			// Retry
			retryAttempt++
			driver.logger.logf(LogInfo, "A recoverable error has occurred. Attempting retry #%d.", retryAttempt)
			driver.logger.logf(LogDebug, "Errored Transaction ID: %s. Error cause: '%v'", txnErr.transactionID, txnErr)
			if txnErr.isISE {
				driver.logger.log(LogDebug, "Replacing expired session...")
				session, err = driver.createSession(ctx)
				if err != nil {
					return nil, err
				}
			} else {
				if !txnErr.abortSuccess {
					driver.logger.log(LogDebug, "Retrying with a different session...")
					driver.semaphore.release()
					session, err = driver.getSession(ctx)
					if err != nil {
						return nil, err
					}
				}
			}

			delay := driver.retryPolicy.Backoff.Delay(retryAttempt)
			sleepWithContext(ctx, delay)
			continue
		}
		driver.releaseSession(session)
		break
	}
	return result, nil
}