in source/compute_plane/python/agent/agent.py [0:0]
def process_subprocess_completion(perf_tracker, task, sqs_msg, fname_stdout, stdout=None):
"""
This function is responsible for updating the dynamoDB item associated to the input task with the ouput of the
execution
Args:
perf_tracker (utils.performance_tracker.PerformanceTracker): endpoint for sending metrics
task (dict): the task that went to completion
sqs_msg (Message): the SQS message associated to the completed task
fname_stdout (file): the file where stdout was redirected
stdout (str): the stdout of the execution
Returns:
Nothing
"""
task["stats"]["stage4_agent_01_user_code_finished_tstmp"]["tstmp"] = get_time_now_ms()
# <1.> Store stdout/stderr into persistent storage
if stdout is not None:
b64output = base64.b64encode(stdout.encode("utf-8"))
stdout_iom.put_output_from_bytes(task["task_id"], data=b64output)
else:
stdout_iom.put_output_from_file(task["task_id"], file_name=fname_stdout)
# logging.info("\n===========STDOUT: ================")
# logging.info(open(fname_stdout, "r").read())
# ret = stdout_iom.put_error_from_file(task["task_id"], file_name=fname_stderr)
# logging.info("\n===========STDERR: ================")
# logging.info(open(fname_stderr, "r").read())
task["stats"]["stage4_agent_02_S3_stdout_delivered_tstmp"]["tstmp"] = get_time_now_ms()
count = 0
is_update_successful = False
while True:
count += 1
time_start_ms = get_time_now_ms()
try:
is_update_successful = state_table.update_task_status_to_finished(
task_id=task["task_id"],
agent_id=SELF_ID
)
logging.info(f"Task status has been set to Finished: {task['task_id']}")
break
except StateTableException as e:
if e.caused_by_throttling:
time_end_ms = get_time_now_ms()
errlog.log(f"Agent FINISHED@StateTable #{count} Throttling for {time_end_ms - time_start_ms} ms")
continue # i.e., retry again
elif e.caused_by_condition:
errlog.log("Agent FINISHED@StateTable exception caused_by_condition")
is_update_successful = False
break
except Exception as e:
errlog.log(f"Unexpected Exception while setting tasks state to finished {e} [{traceback.format_exc()}]")
raise e
if not is_update_successful:
# We can get here if task has been taken over by the watchdog lambda
# in this case we ignore results and proceed to the next task.
event_counter_post.increment("ddb_set_task_finished_failed")
logging.warning(f"Could not set completion state for a task {task['task_id']} to Finish")
else:
event_counter_post.increment("ddb_set_task_finished_succeeded")
logging.info(
"We have successfully marked task as completed in dynamodb."
" Deleting message from the SQS... for task [{}]".format(
task["task_id"]))
tasks_queue.delete_message(sqs_msg["properties"]["message_handle_id"])
logging.info("Exec time1: {} {}".format(get_time_now_ms() - AGENT_EXEC_TIMESTAMP_MS, AGENT_EXEC_TIMESTAMP_MS))
event_counter_post.increment("agent_total_time_ms", get_time_now_ms() - AGENT_EXEC_TIMESTAMP_MS)
event_counter_post.set("str_pod_id", SELF_ID)
submit_post_agent_measurements(task, perf_tracker)