in lambdas/helper/dal.py [0:0]
def batch_execute_statement(self, sql_stmt, sql_param_sets, batch_size, transaction_id=None):
parameters = f' with parameters: {sql_param_sets}' if len(sql_param_sets) > 0 else ''
logger.debug(f'Running SQL statement: {sql_stmt}{parameters}')
DataAccessLayer._xray_start('batch_execute_statement')
try:
array_length = len(sql_param_sets)
num_batches = 1 + len(sql_param_sets)//batch_size
results = []
for i in range(0, num_batches):
start_idx = i*batch_size
end_idx = min(start_idx + batch_size, array_length)
batch_sql_param_sets = sql_param_sets[start_idx:end_idx]
if len(batch_sql_param_sets) > 0:
print(f'Running SQL statement: [batch #{i+1}/{num_batches}, batch size {batch_size}, SQL: {sql_stmt}]')
DataAccessLayer._xray_add_metadata('sql_statement', sql_stmt)
parameters = {
'secretArn': self._db_credentials_secrets_store_arn,
'database': self._database_name,
'resourceArn': self._db_cluster_arn,
'sql': sql_stmt,
'parameterSets': batch_sql_param_sets
}
if transaction_id is not None:
parameters['transactionId'] = transaction_id
result = self._rdsdata_client.batch_execute_statement(**parameters)
results.append(result)
except Exception as e:
logger.debug(f'Error running SQL statement (error class: {e.__class__})')
raise DataAccessLayerException(e) from e
else:
DataAccessLayer._xray_add_metadata('rdsdata_executesql_result', json.dumps(result))
return results
finally:
DataAccessLayer._xray_stop()