in cloud-composer/dags/sample-rideshare-run-data-quality.py [0:0]
def get_clouddq_task_status(task_id):
# Wait for job to start
print ("get_clouddq_task_status STARTED, sleeping for 60 seconds for jobs to start")
time.sleep(60)
# Get auth (default service account running composer worker node)
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request() # required to acess access token
creds.refresh(auth_req)
access_token=creds.token
auth_header = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
uri = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/lakes/{rideshare_dataplex_lake_name}/tasks/{task_id}/jobs"
serviceJob = ""
# Get the jobs
# Get the status of each job (or the first for the demo)
try:
response = requests.get(uri, headers=auth_header)
print("get_clouddq_task_status response status code: ", response.status_code)
print("get_clouddq_task_status response status text: ", response.text)
response_json = json.loads(response.text)
if response.status_code == 200:
if ("jobs" in response_json and len(response_json["jobs"]) > 0):
serviceJob = response_json["jobs"][0]["serviceJob"]
print("get_clouddq_task_status serviceJob: ", serviceJob)
else:
errorMessage = "Could not find serviceJob in REST API response"
raise Exception(errorMessage)
else:
errorMessage = "REAT API (serviceJob) response returned response.status_code: " + str(response.status_code)
raise Exception(errorMessage)
except requests.exceptions.RequestException as err:
print(err)
raise err
dataproc_job_id = serviceJob.replace(f"projects/{project_id}/locations/{dataplex_region}/batches/","")
print ("dataproc_job_id: ", dataproc_job_id)
serviceJob_uri = f"https://dataproc.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/batches/{dataproc_job_id}"
print ("serviceJob_uri:", serviceJob_uri)
# Run for for so many interations
counter = 1
while (counter < 60):
try:
response = requests.get(serviceJob_uri, headers=auth_header)
print("get_clouddq_task_status response status code: ", response.status_code)
print("get_clouddq_task_status response status text: ", response.text)
response_json = json.loads(response.text)
if response.status_code == 200:
if ("state" in response_json):
task_status = response_json["state"]
print("get_clouddq_task_status task_status: ", task_status)
if (task_status == 'SUCCEEDED'):
return True
if (task_status == 'FAILED'
or task_status == 'CANCELLED'
or task_status == 'ABORTED'):
errorMessage = "Task failed with status of: " + task_status
raise Exception(errorMessage)
# Assuming state is RUNNING or PENDING
time.sleep(30)
else:
errorMessage = "Could not find Job State in REST API response"
raise Exception(errorMessage)
else:
errorMessage = "REAT API response returned response.status_code: " + str(response.status_code)
raise Exception(errorMessage)
except requests.exceptions.RequestException as err:
print(err)
raise err
counter = counter + 1
errorMessage = "The process (get_clouddq_task_status) run for too long. Increase the number of iterations."
raise Exception(errorMessage)