cloud-composer/dags/sample-dataplex-run-data-quality.py [143:213]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    serviceJob = ""

    # Get the jobs
    # Get the status of each job (or the first for the demo)
    try:
        response = requests.get(uri, headers=auth_header)
        print("get_clouddq_task_status response status code: ", response.status_code)
        print("get_clouddq_task_status response status text: ", response.text)
        response_json = json.loads(response.text)
        if response.status_code == 200:
            if ("jobs" in response_json and len(response_json["jobs"]) > 0):
                serviceJob = response_json["jobs"][0]["serviceJob"]
                print("get_clouddq_task_status serviceJob: ", serviceJob)
            else:
                errorMessage = "Could not find serviceJob in REST API response"
                raise Exception(errorMessage)
        else:
            errorMessage = "REAT API (serviceJob) response returned response.status_code: " + str(response.status_code)
            raise Exception(errorMessage)
    except requests.exceptions.RequestException as err:
        print(err)
        raise err

    dataproc_job_id = serviceJob.replace(f"projects/{project_id}/locations/{dataplex_region}/batches/","")
    print ("dataproc_job_id: ", dataproc_job_id)
    serviceJob_uri  = f"https://dataproc.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/batches/{dataproc_job_id}"
    print ("serviceJob_uri:", serviceJob_uri)

    # Run for for so many interations
    counter  = 1
    while (counter < 60):    
        try:
            response = requests.get(serviceJob_uri, headers=auth_header)
            print("get_clouddq_task_status response status code: ", response.status_code)
            print("get_clouddq_task_status response status text: ", response.text)
            response_json = json.loads(response.text)
            if response.status_code == 200:
                if ("state" in response_json):
                    task_status = response_json["state"]
                    print("get_clouddq_task_status task_status: ", task_status)
                    if (task_status == 'SUCCEEDED'):
                        return True
                    
                    if (task_status == 'FAILED' 
                        or task_status == 'CANCELLED'
                        or task_status == 'ABORTED'):
                        errorMessage = "Task failed with status of: " + task_status
                        raise Exception(errorMessage)

                    # Assuming state is RUNNING or PENDING
                    time.sleep(30)
                else:
                    errorMessage = "Could not find Job State in REST API response"
                    raise Exception(errorMessage)
            else:
                errorMessage = "REAT API response returned response.status_code: " + str(response.status_code)
                raise Exception(errorMessage)
        except requests.exceptions.RequestException as err:
            print(err)
            raise err
        counter = counter + 1

    errorMessage = "The process (get_clouddq_task_status) run for too long.  Increase the number of iterations."
    raise Exception(errorMessage)


# Run a SQL query to get the consolidated table results
# Attach a tag template in data catalog at the table level for taxi trips
# NOTE: This will overrite the template over and over (not add new one)
def attach_tag_template_to_table():
    client = bigquery.Client()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



cloud-composer/dags/sample-rideshare-run-data-quality.py [142:212]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    serviceJob = ""

    # Get the jobs
    # Get the status of each job (or the first for the demo)
    try:
        response = requests.get(uri, headers=auth_header)
        print("get_clouddq_task_status response status code: ", response.status_code)
        print("get_clouddq_task_status response status text: ", response.text)
        response_json = json.loads(response.text)
        if response.status_code == 200:
            if ("jobs" in response_json and len(response_json["jobs"]) > 0):
                serviceJob = response_json["jobs"][0]["serviceJob"]
                print("get_clouddq_task_status serviceJob: ", serviceJob)
            else:
                errorMessage = "Could not find serviceJob in REST API response"
                raise Exception(errorMessage)
        else:
            errorMessage = "REAT API (serviceJob) response returned response.status_code: " + str(response.status_code)
            raise Exception(errorMessage)
    except requests.exceptions.RequestException as err:
        print(err)
        raise err

    dataproc_job_id = serviceJob.replace(f"projects/{project_id}/locations/{dataplex_region}/batches/","")
    print ("dataproc_job_id: ", dataproc_job_id)
    serviceJob_uri  = f"https://dataproc.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/batches/{dataproc_job_id}"
    print ("serviceJob_uri:", serviceJob_uri)

    # Run for for so many interations
    counter  = 1
    while (counter < 60):    
        try:
            response = requests.get(serviceJob_uri, headers=auth_header)
            print("get_clouddq_task_status response status code: ", response.status_code)
            print("get_clouddq_task_status response status text: ", response.text)
            response_json = json.loads(response.text)
            if response.status_code == 200:
                if ("state" in response_json):
                    task_status = response_json["state"]
                    print("get_clouddq_task_status task_status: ", task_status)
                    if (task_status == 'SUCCEEDED'):
                        return True
                    
                    if (task_status == 'FAILED' 
                        or task_status == 'CANCELLED'
                        or task_status == 'ABORTED'):
                        errorMessage = "Task failed with status of: " + task_status
                        raise Exception(errorMessage)

                    # Assuming state is RUNNING or PENDING
                    time.sleep(30)
                else:
                    errorMessage = "Could not find Job State in REST API response"
                    raise Exception(errorMessage)
            else:
                errorMessage = "REAT API response returned response.status_code: " + str(response.status_code)
                raise Exception(errorMessage)
        except requests.exceptions.RequestException as err:
            print(err)
            raise err
        counter = counter + 1

    errorMessage = "The process (get_clouddq_task_status) run for too long.  Increase the number of iterations."
    raise Exception(errorMessage)


# Run a SQL query to get the consolidated table results
# Attach a tag template in data catalog at the table level for rideshare trips
# NOTE: This will overrite the template over and over (not add new one)
def attach_tag_template_to_table():
    client = bigquery.Client()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



