def get_job_status()

in functions/data-processing-engines/dataproc-serverless-job-executor/main.py [0:0]


def get_job_status(job_id, extracted_params):
    """
    gets the status of a dataproc serverless job

    Args:
        request_json (dict) : event dictionary
    Returns:
        str: status of the dataproc serverless batch job
    """

    dataproc_serverless_project_id = extracted_params.get('dataproc_serverless_project_id')
    dataproc_serverless_region = extracted_params.get('dataproc_serverless_region')

    credentials.refresh(Request())
    headers = {"Authorization": f"Bearer {credentials.token}"}

    url = (f"https://dataproc.googleapis.com/v1/projects/{dataproc_serverless_project_id}/"
           f"locations/{dataproc_serverless_region}/batches/{job_id}")
    print("Url::::" + url)

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        print("response::" + str(response))
        return response.json().get("state")
    else:
        error_message = f"Dataproc API GET request failed. Status code:{response.status_code}"
        print(error_message)
        print(response.text)
        raise Exception(error_message)