in packages/cdk/lib/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py [0:0]
def _build_run_log_dict_(self, metadata, outputs):
run_id = metadata.get("id")
status = metadata["status"]
if (status == "fail") or (run_id is None):
return {
"run_id": run_id,
"state": self._get_workflow_state_(status=status),
"request": {
"workflow_params": self.workflow_params,
"workflow_type": None,
"workflow_type_version": None,
},
}
submitted_files = metadata["submittedFiles"]
workflow_type = submitted_files["workflowType"]
workflow_type_version = submitted_files["workflowTypeVersion"]
workflow_url = submitted_files["workflowUrl"]
run_request = {
"workflow_params": self.workflow_params,
"workflow_type": workflow_type,
"workflow_type_version": workflow_type_version,
"workflow_url": workflow_url,
}
task_logs = []
calls = metadata["calls"]
for task_name in calls.keys():
for task in calls[task_name]:
log = {
"name": task_name + "|" + task.get("jobId", "XXXXX"),
"cmd": [task.get("commandLine")],
"start_time": task.get("start"),
"end_time": task.get("end"),
"stdout": task.get("stdout"),
"stderr": task.get("stderr"),
"exit_code": ("" if task.get("returnCode") == None else str(task.get("returnCode"))),
}
task_logs.append(log)
run_log_dict = {
"run_id": run_id,
"request": run_request,
"state": self._get_workflow_state_(status=status),
"task_logs": task_logs,
"outputs": outputs,
}
return run_log_dict