def get_clouddq_task_status()

in cloud-composer/dags/sample-rideshare-run-data-quality.py [0:0]


def get_clouddq_task_status(task_id):
    # Wait for job to start
    print ("get_clouddq_task_status STARTED, sleeping for 60 seconds for jobs to start")
    time.sleep(60)

    # Get auth (default service account running composer worker node)
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request() # required to acess access token
    creds.refresh(auth_req)
    access_token=creds.token
    auth_header = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }

    uri = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/lakes/{rideshare_dataplex_lake_name}/tasks/{task_id}/jobs"
    serviceJob = ""

    # Get the jobs
    # Get the status of each job (or the first for the demo)
    try:
        response = requests.get(uri, headers=auth_header)
        print("get_clouddq_task_status response status code: ", response.status_code)
        print("get_clouddq_task_status response status text: ", response.text)
        response_json = json.loads(response.text)
        if response.status_code == 200:
            if ("jobs" in response_json and len(response_json["jobs"]) > 0):
                serviceJob = response_json["jobs"][0]["serviceJob"]
                print("get_clouddq_task_status serviceJob: ", serviceJob)
            else:
                errorMessage = "Could not find serviceJob in REST API response"
                raise Exception(errorMessage)
        else:
            errorMessage = "REAT API (serviceJob) response returned response.status_code: " + str(response.status_code)
            raise Exception(errorMessage)
    except requests.exceptions.RequestException as err:
        print(err)
        raise err

    dataproc_job_id = serviceJob.replace(f"projects/{project_id}/locations/{dataplex_region}/batches/","")
    print ("dataproc_job_id: ", dataproc_job_id)
    serviceJob_uri  = f"https://dataproc.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/batches/{dataproc_job_id}"
    print ("serviceJob_uri:", serviceJob_uri)

    # Run for for so many interations
    counter  = 1
    while (counter < 60):    
        try:
            response = requests.get(serviceJob_uri, headers=auth_header)
            print("get_clouddq_task_status response status code: ", response.status_code)
            print("get_clouddq_task_status response status text: ", response.text)
            response_json = json.loads(response.text)
            if response.status_code == 200:
                if ("state" in response_json):
                    task_status = response_json["state"]
                    print("get_clouddq_task_status task_status: ", task_status)
                    if (task_status == 'SUCCEEDED'):
                        return True
                    
                    if (task_status == 'FAILED' 
                        or task_status == 'CANCELLED'
                        or task_status == 'ABORTED'):
                        errorMessage = "Task failed with status of: " + task_status
                        raise Exception(errorMessage)

                    # Assuming state is RUNNING or PENDING
                    time.sleep(30)
                else:
                    errorMessage = "Could not find Job State in REST API response"
                    raise Exception(errorMessage)
            else:
                errorMessage = "REAT API response returned response.status_code: " + str(response.status_code)
                raise Exception(errorMessage)
        except requests.exceptions.RequestException as err:
            print(err)
            raise err
        counter = counter + 1

    errorMessage = "The process (get_clouddq_task_status) run for too long.  Increase the number of iterations."
    raise Exception(errorMessage)