in pyqldb/driver/qldb_driver.py [0:0]
def execute_lambda(self, query_lambda, retry_config=None):
"""
Execute the lambda function against QLDB within a transaction and retrieve the result. It will retry up to the
retry limit if an OCC conflict or retryable exception occurs.
This is the primary method to execute a transaction against Amazon QLDB ledger.
:type query_lambda: function
:param query_lambda: The lambda function to execute. The function receives an instance of
:py:class:`pyqldb.execution.executor.Executor` which can be used to execute statements.
The instance of :py:class:`pyqldb.execution.executor.Executor` wraps an implicitly created
transaction. The transaction will be implicitly committed when the passed function returns.
The lambda function cannot have any side effects as it may be invoked multiple
times, and the result cannot be trusted until the transaction is committed.
:type retry_config: :py:class:`pyqldb.config.retry_config.RetryConfig`
:param retry_config: Config to specify max number of retries, base and custom backoff strategy for retries.
This config overrides the retry config set at driver level for a particular lambda
execution.
Note that all the values of the driver level retry config will be overwritten by the new
config passed here.
:rtype: :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor`/object
:return: The return value of the lambda function which could be a
:py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor` on the result set of a statement within the
lambda.
:raises DriverClosedError: When this driver is closed.
:raises IllegalStateError: When the commit digest from commit transaction result does not match.
:raises ClientError: When there is an error executing against QLDB.
:raises LambdaAbortedError: If the lambda function calls :py:class:`pyqldb.execution.executor.Executor.abort`.
"""
if self._is_closed:
raise DriverClosedError
retry_config = self._retry_config if retry_config is None else retry_config
start_new_session = False
session = None
retry_attempt = 0
while True:
try:
retry_attempt += 1
session = self._get_session(start_new_session)
return session._execute_lambda(query_lambda)
except Exception as e:
if isinstance(e, ExecuteError):
if e.is_retryable:
# Always retry on the first attempt if failure was caused by a stale session in the pool.
if retry_attempt == 1 and e.is_invalid_session_exception:
continue
if retry_attempt > retry_config.retry_limit:
raise e.error
self._retry_sleep(retry_config, retry_attempt, e.error, e.transaction_id)
logger.info('A recoverable error has occurred. Attempting retry #{}'.format(retry_attempt))
logger.debug('Error cause: {}'.format(e.error))
else:
raise e.error
else:
raise e
finally:
start_new_session = not self._release_session(session)