def try_to_acquire_a_task()

in source/compute_plane/python/agent/agent.py [0:0]


def try_to_acquire_a_task():
    """
    This function will fetch tasks from the SQS queue one at a time. Once is tasks is polled from the queue, then agent
    will try to acquire the task by a conditional write on dymanoDB. The tasks will be acquired if tasks in dynamoDB
    is set as "pending" and the owner is "None"

    Returns:
        A tuple containing the SQS message and the task definition

    Raises:
        Exception: occurs when task acquisition failed

    """
    global AGENT_EXEC_TIMESTAMP_MS

    logging.info("Waiting for a task in the queue...")
    message = tasks_queue.receive_message(wait_time_sec=10)

    task_pick_up_from_sqs_ms = get_time_now_ms()

    logging.info(f"try_to_acquire_a_task, message: {message}")
    # print(len(messages))

    if "body" not in message:
        event_counter_pre.increment("agent_no_messages_in_tasks_queue")
        return None, None

    AGENT_EXEC_TIMESTAMP_MS = get_time_now_ms()

    task = json.loads(message["body"])
    logging.debug(f"try_to_acquire_a_task, task: {task}")

    # Since we read this message from the task queue, now we need to associate
    # message handler with this message, so it is possible to manipulate this message via handler
    task["sqs_handle_id"] = message["properties"]["message_handle_id"]
    try:

        logging.info(f"Calling: {__name__} task_id: {task['task_id']}, agent_id: {SELF_ID}")

        claim_result = state_table.claim_task_for_agent(
            task_id=task["task_id"],
            queue_handle_id=task["sqs_handle_id"],
            agent_id=SELF_ID,
            expiration_timestamp=ttl_gen.generate_next_ttl().get_next_expiration_timestamp()
        )

        logging.info("State Table claim_task_for_agent result: {}".format(claim_result))

    except StateTableException as e:

        if e.caused_by_condition or e.caused_by_throttling:

            event_counter_pre.increment("agent_failed_to_claim_ddb_task")

            if is_task_has_been_cancelled(task["task_id"]):
                logging.info("Task [{}] has been already cancelled, skipping".format(task['task_id']))
                tasks_queue.delete_message(message_handle_id=task["sqs_handle_id"])
                return None, None

            else:

                time.sleep(random.randint(1, 3))
                return None, None

    except Exception as e:
        errlog.log("Unexpected error in claim_task_for_agent {} [{}]".format(
            e, traceback.format_exc()))
        raise e

    # Message should not re-appear in the queue until task is completed
    tasks_queue.change_visibility(message["properties"]["message_handle_id"],
                                  visibility_timeout_sec=agent_task_visibility_timeout_sec)

    task["stats"]["stage3_agent_01_task_acquired_sqs_tstmp"]["tstmp"] = task_pick_up_from_sqs_ms
    task["stats"]["stage3_agent_02_task_acquired_ddb_tstmp"]["tstmp"] = get_time_now_ms()
    event_counter_pre.increment("agent_successful_acquire_a_task")

    return message, task