in functions/data-processing-engines/dataproc-serverless-job-executor/main.py [0:0]
def get_job_status(job_id, extracted_params):
"""
gets the status of a dataproc serverless job
Args:
request_json (dict) : event dictionary
Returns:
str: status of the dataproc serverless batch job
"""
dataproc_serverless_project_id = extracted_params.get('dataproc_serverless_project_id')
dataproc_serverless_region = extracted_params.get('dataproc_serverless_region')
credentials.refresh(Request())
headers = {"Authorization": f"Bearer {credentials.token}"}
url = (f"https://dataproc.googleapis.com/v1/projects/{dataproc_serverless_project_id}/"
f"locations/{dataproc_serverless_region}/batches/{job_id}")
print("Url::::" + url)
response = requests.get(url, headers=headers)
if response.status_code == 200:
print("response::" + str(response))
return response.json().get("state")
else:
error_message = f"Dataproc API GET request failed. Status code:{response.status_code}"
print(error_message)
print(response.text)
raise Exception(error_message)