def handler()

in backend/lambdas/tasks/work_query_queue.py [0:0]


def handler(event, context):
    concurrency_limit = int(event.get("AthenaConcurrencyLimit", 15))
    wait_duration = int(event.get("QueryExecutionWaitSeconds", 15))
    execution_retries_left = int(event.get("AthenaQueryMaxRetries", 2))
    execution_id = event["ExecutionId"]
    job_id = event["ExecutionName"]
    previously_started = event.get("RunningExecutions", {"Data": [], "Total": 0})
    executions = [load_execution(execution) for execution in previously_started["Data"]]
    succeeded = [
        execution for execution in executions if execution["status"] == "SUCCEEDED"
    ]
    still_running = [
        execution for execution in executions if execution["status"] == "RUNNING"
    ]
    failed = [
        execution
        for execution in executions
        if execution["status"] not in ["SUCCEEDED", "RUNNING"]
    ]
    clear_completed(succeeded)
    is_failing = previously_started.get("IsFailing", False)
    if len(failed) > 0:
        is_failing = True
    # Only abandon for failures once all running queries are done
    if is_failing and len(still_running) == 0:
        abandon_execution(failed)

    remaining_capacity = int(concurrency_limit) - len(still_running)
    # Only schedule new queries if there have been no errors
    if remaining_capacity > 0 and not is_failing:
        msgs = read_queue(queue, remaining_capacity)
        started = []
        for msg in msgs:
            body = json.loads(msg.body)
            body["AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID"] = execution_id
            body["JobId"] = job_id
            body["WaitDuration"] = wait_duration
            body["ExecutionRetriesLeft"] = execution_retries_left
            query_executor = body["QueryExecutor"]
            if query_executor == "athena":
                resp = sf_client.start_execution(
                    stateMachineArn=state_machine_arn, input=json.dumps(body)
                )
                started.append({**resp, "ReceiptHandle": msg.receipt_handle})
            else:
                raise NotImplementedError(
                    "Unsupported query executor: '{}'".format(query_executor)
                )
        still_running += started

    return {
        "IsFailing": is_failing,
        "Data": [
            {"ExecutionArn": e["executionArn"], "ReceiptHandle": e["ReceiptHandle"]}
            for e in still_running
        ],
        "Total": len(still_running),
    }