in backend/lambdas/tasks/work_query_queue.py [0:0]
def handler(event, context):
concurrency_limit = int(event.get("AthenaConcurrencyLimit", 15))
wait_duration = int(event.get("QueryExecutionWaitSeconds", 15))
execution_retries_left = int(event.get("AthenaQueryMaxRetries", 2))
execution_id = event["ExecutionId"]
job_id = event["ExecutionName"]
previously_started = event.get("RunningExecutions", {"Data": [], "Total": 0})
executions = [load_execution(execution) for execution in previously_started["Data"]]
succeeded = [
execution for execution in executions if execution["status"] == "SUCCEEDED"
]
still_running = [
execution for execution in executions if execution["status"] == "RUNNING"
]
failed = [
execution
for execution in executions
if execution["status"] not in ["SUCCEEDED", "RUNNING"]
]
clear_completed(succeeded)
is_failing = previously_started.get("IsFailing", False)
if len(failed) > 0:
is_failing = True
# Only abandon for failures once all running queries are done
if is_failing and len(still_running) == 0:
abandon_execution(failed)
remaining_capacity = int(concurrency_limit) - len(still_running)
# Only schedule new queries if there have been no errors
if remaining_capacity > 0 and not is_failing:
msgs = read_queue(queue, remaining_capacity)
started = []
for msg in msgs:
body = json.loads(msg.body)
body["AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID"] = execution_id
body["JobId"] = job_id
body["WaitDuration"] = wait_duration
body["ExecutionRetriesLeft"] = execution_retries_left
query_executor = body["QueryExecutor"]
if query_executor == "athena":
resp = sf_client.start_execution(
stateMachineArn=state_machine_arn, input=json.dumps(body)
)
started.append({**resp, "ReceiptHandle": msg.receipt_handle})
else:
raise NotImplementedError(
"Unsupported query executor: '{}'".format(query_executor)
)
still_running += started
return {
"IsFailing": is_failing,
"Data": [
{"ExecutionArn": e["executionArn"], "ReceiptHandle": e["ReceiptHandle"]}
for e in still_running
],
"Total": len(still_running),
}