in qldbdriver/qldbdriver.go [108:173]
func (driver *QLDBDriver) Execute(ctx context.Context, fn func(txn Transaction) (interface{}, error)) (interface{}, error) {
if driver.isClosed {
return nil, &qldbDriverError{"Cannot invoke methods on a closed QLDBDriver."}
}
retryAttempt := 0
session, err := driver.getSession(ctx)
if err != nil {
return nil, err
}
var result interface{}
var txnErr *txnError
for {
result, txnErr = session.execute(ctx, fn)
if txnErr != nil {
// If initial session is invalid, always retry once
if txnErr.canRetry && txnErr.isISE && retryAttempt == 0 {
driver.logger.log(LogDebug, "Initial session received from pool invalid. Retrying...")
session, err = driver.createSession(ctx)
if err != nil {
return nil, err
}
retryAttempt++
continue
}
// Do not retry
if !txnErr.canRetry || retryAttempt >= driver.retryPolicy.MaxRetryLimit {
if txnErr.abortSuccess {
driver.releaseSession(session)
} else {
driver.semaphore.release()
}
return nil, txnErr.unwrap()
}
// Retry
retryAttempt++
driver.logger.logf(LogInfo, "A recoverable error has occurred. Attempting retry #%d.", retryAttempt)
driver.logger.logf(LogDebug, "Errored Transaction ID: %s. Error cause: '%v'", txnErr.transactionID, txnErr)
if txnErr.isISE {
driver.logger.log(LogDebug, "Replacing expired session...")
session, err = driver.createSession(ctx)
if err != nil {
return nil, err
}
} else {
if !txnErr.abortSuccess {
driver.logger.log(LogDebug, "Retrying with a different session...")
driver.semaphore.release()
session, err = driver.getSession(ctx)
if err != nil {
return nil, err
}
}
}
delay := driver.retryPolicy.Backoff.Delay(retryAttempt)
sleepWithContext(ctx, delay)
continue
}
driver.releaseSession(session)
break
}
return result, nil
}