def get_xpk_job_status()

in xlml/utils/metric.py [0:0]


def get_xpk_job_status(benchmark_id: str) -> bigquery.JobStatus:
  """Get job status for the GKE run.

  FAILED - if any failure occurs in run_model
  SUCCESS - end-to-end model tests are successful in run_model
  """
  context = get_current_context()
  execution_date = context["dag_run"].logical_date
  current_dag = context["dag"]

  workload_completion = current_dag.get_task(
      task_id=f"{benchmark_id}.run_model.wait_for_workload_completion"
  )
  workload_completion_ti = TaskInstance(workload_completion, execution_date)
  workload_completion_state = workload_completion_ti.current_state()

  if workload_completion_state == TaskState.SUCCESS.value:
    logging.info(
        "The wait_for_workload_completion state is success, and the job status"
        " is success."
    )
    return bigquery.JobStatus.SUCCESS

  logging.info(
      "The wait_for_workload_completion state is not success, and the job"
      " status is failed."
  )
  return bigquery.JobStatus.FAILED