def process_subprocess_completion()

in source/compute_plane/python/agent/agent.py [0:0]


def process_subprocess_completion(perf_tracker, task, sqs_msg, fname_stdout, stdout=None):
    """
    This function is responsible for updating the dynamoDB item associated to the input task with the ouput of the
    execution
    Args:
        perf_tracker (utils.performance_tracker.PerformanceTracker): endpoint for sending metrics
        task (dict): the task that went to completion
        sqs_msg (Message): the SQS message associated to the completed task
        fname_stdout (file): the file  where stdout was redirected
        stdout (str): the stdout of the execution

    Returns:
        Nothing

    """
    task["stats"]["stage4_agent_01_user_code_finished_tstmp"]["tstmp"] = get_time_now_ms()

    # <1.> Store stdout/stderr into persistent storage
    if stdout is not None:
        b64output = base64.b64encode(stdout.encode("utf-8"))
        stdout_iom.put_output_from_bytes(task["task_id"], data=b64output)
    else:
        stdout_iom.put_output_from_file(task["task_id"], file_name=fname_stdout)
        # logging.info("\n===========STDOUT: ================")
        # logging.info(open(fname_stdout, "r").read())

        # ret = stdout_iom.put_error_from_file(task["task_id"], file_name=fname_stderr)

        # logging.info("\n===========STDERR: ================")
        # logging.info(open(fname_stderr, "r").read())

    task["stats"]["stage4_agent_02_S3_stdout_delivered_tstmp"]["tstmp"] = get_time_now_ms()

    count = 0
    is_update_successful = False
    while True:
        count += 1
        time_start_ms = get_time_now_ms()

        try:
            is_update_successful = state_table.update_task_status_to_finished(
                task_id=task["task_id"],
                agent_id=SELF_ID
            )

            logging.info(f"Task status has been set to Finished: {task['task_id']}")

            break

        except StateTableException as e:

            if e.caused_by_throttling:

                time_end_ms = get_time_now_ms()

                errlog.log(f"Agent FINISHED@StateTable #{count} Throttling for {time_end_ms - time_start_ms} ms")

                continue  # i.e., retry again

            elif e.caused_by_condition:

                errlog.log("Agent FINISHED@StateTable exception caused_by_condition")

                is_update_successful = False

                break

        except Exception as e:
            errlog.log(f"Unexpected Exception while setting tasks state to finished {e} [{traceback.format_exc()}]")
            raise e

    if not is_update_successful:
        # We can get here if task has been taken over by the watchdog lambda
        # in this case we ignore results and proceed to the next task.
        event_counter_post.increment("ddb_set_task_finished_failed")
        logging.warning(f"Could not set completion state for a task {task['task_id']} to Finish")

    else:
        event_counter_post.increment("ddb_set_task_finished_succeeded")
        logging.info(
            "We have successfully marked task as completed in dynamodb."
            " Deleting message from the SQS... for task [{}]".format(
                task["task_id"]))
        tasks_queue.delete_message(sqs_msg["properties"]["message_handle_id"])

    logging.info("Exec time1: {} {}".format(get_time_now_ms() - AGENT_EXEC_TIMESTAMP_MS, AGENT_EXEC_TIMESTAMP_MS))
    event_counter_post.increment("agent_total_time_ms", get_time_now_ms() - AGENT_EXEC_TIMESTAMP_MS)
    event_counter_post.set("str_pod_id", SELF_ID)

    submit_post_agent_measurements(task, perf_tracker)