in xlml/utils/metric.py [0:0]
def get_xpk_job_status(benchmark_id: str) -> bigquery.JobStatus:
"""Get job status for the GKE run.
FAILED - if any failure occurs in run_model
SUCCESS - end-to-end model tests are successful in run_model
"""
context = get_current_context()
execution_date = context["dag_run"].logical_date
current_dag = context["dag"]
workload_completion = current_dag.get_task(
task_id=f"{benchmark_id}.run_model.wait_for_workload_completion"
)
workload_completion_ti = TaskInstance(workload_completion, execution_date)
workload_completion_state = workload_completion_ti.current_state()
if workload_completion_state == TaskState.SUCCESS.value:
logging.info(
"The wait_for_workload_completion state is success, and the job status"
" is success."
)
return bigquery.JobStatus.SUCCESS
logging.info(
"The wait_for_workload_completion state is not success, and the job"
" status is failed."
)
return bigquery.JobStatus.FAILED