in source/compute_plane/python/agent/agent.py [0:0]
def try_to_acquire_a_task():
"""
This function will fetch tasks from the SQS queue one at a time. Once is tasks is polled from the queue, then agent
will try to acquire the task by a conditional write on dymanoDB. The tasks will be acquired if tasks in dynamoDB
is set as "pending" and the owner is "None"
Returns:
A tuple containing the SQS message and the task definition
Raises:
Exception: occurs when task acquisition failed
"""
global AGENT_EXEC_TIMESTAMP_MS
logging.info("Waiting for a task in the queue...")
message = tasks_queue.receive_message(wait_time_sec=10)
task_pick_up_from_sqs_ms = get_time_now_ms()
logging.info(f"try_to_acquire_a_task, message: {message}")
# print(len(messages))
if "body" not in message:
event_counter_pre.increment("agent_no_messages_in_tasks_queue")
return None, None
AGENT_EXEC_TIMESTAMP_MS = get_time_now_ms()
task = json.loads(message["body"])
logging.debug(f"try_to_acquire_a_task, task: {task}")
# Since we read this message from the task queue, now we need to associate
# message handler with this message, so it is possible to manipulate this message via handler
task["sqs_handle_id"] = message["properties"]["message_handle_id"]
try:
logging.info(f"Calling: {__name__} task_id: {task['task_id']}, agent_id: {SELF_ID}")
claim_result = state_table.claim_task_for_agent(
task_id=task["task_id"],
queue_handle_id=task["sqs_handle_id"],
agent_id=SELF_ID,
expiration_timestamp=ttl_gen.generate_next_ttl().get_next_expiration_timestamp()
)
logging.info("State Table claim_task_for_agent result: {}".format(claim_result))
except StateTableException as e:
if e.caused_by_condition or e.caused_by_throttling:
event_counter_pre.increment("agent_failed_to_claim_ddb_task")
if is_task_has_been_cancelled(task["task_id"]):
logging.info("Task [{}] has been already cancelled, skipping".format(task['task_id']))
tasks_queue.delete_message(message_handle_id=task["sqs_handle_id"])
return None, None
else:
time.sleep(random.randint(1, 3))
return None, None
except Exception as e:
errlog.log("Unexpected error in claim_task_for_agent {} [{}]".format(
e, traceback.format_exc()))
raise e
# Message should not re-appear in the queue until task is completed
tasks_queue.change_visibility(message["properties"]["message_handle_id"],
visibility_timeout_sec=agent_task_visibility_timeout_sec)
task["stats"]["stage3_agent_01_task_acquired_sqs_tstmp"]["tstmp"] = task_pick_up_from_sqs_ms
task["stats"]["stage3_agent_02_task_acquired_ddb_tstmp"]["tstmp"] = get_time_now_ms()
event_counter_pre.increment("agent_successful_acquire_a_task")
return message, task