cloud-composer/dags/sample-dataplex-run-data-quality.py [76:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
data_quality_dataset_id               = "dataplex_data_quality"
data_quality_table_name               = "data_quality_results"
DATAPLEX_PUBLIC_GCS_BUCKET_NAME       = f"dataplex-clouddq-artifacts-{dataplex_region}"
CLOUDDQ_EXECUTABLE_FILE_PATH          = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip"
CLOUDDQ_EXECUTABLE_HASHSUM_FILE_PATH  = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum" 
FULL_TARGET_TABLE_NAME                = f"{project_id}.{data_quality_dataset_id}.{data_quality_table_name}"  
TRIGGER_SPEC_TYPE                     = "ON_DEMAND"  

spark_python_script_file        = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py"

# NOTE: This is case senstive for some reason
bigquery_region = bigquery_region.upper()

# https://cloud.google.com/dataplex/docs/reference/rpc/google.cloud.dataplex.v1
# https://cloud.google.com/dataplex/docs/reference/rpc/google.cloud.dataplex.v1#google.cloud.dataplex.v1.Task.InfrastructureSpec.VpcNetwork
data_quality_config = {
    "spark": {
        "python_script_file": spark_python_script_file,
        "file_uris": [CLOUDDQ_EXECUTABLE_FILE_PATH,
                      CLOUDDQ_EXECUTABLE_HASHSUM_FILE_PATH,
                      yaml_path
                      ],
        "infrastructure_spec" : {
            "vpc_network" : {
                "sub_network" : vpc_subnet_name
            }
        },                      
    },
    "execution_spec": {
        "service_account": service_account_to_run_dataplex,
        "max_job_execution_lifetime" : Duration(seconds=2*60*60),
        "args": {
            "TASK_ARGS": f"clouddq-executable.zip, \
                 ALL, \
                 {yaml_path}, \
                --gcp_project_id={project_id}, \
                --gcp_region_id={bigquery_region}, \
                --gcp_bq_dataset_id={data_quality_dataset_id}, \
                --target_bigquery_summary_table={FULL_TARGET_TABLE_NAME}"
        }
    },
    "trigger_spec": {
        "type_": TRIGGER_SPEC_TYPE
    },
    "description": "CloudDQ Airflow Task"
}


# Check on the status of the job
# Call the rest API of dataplex and then get the dataproc job and then check the status of the dataproc job
def get_clouddq_task_status(task_id):
    # Wait for job to start
    print ("get_clouddq_task_status STARTED, sleeping for 60 seconds for jobs to start")
    time.sleep(60)

    # Get auth (default service account running composer worker node)
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request() # required to acess access token
    creds.refresh(auth_req)
    access_token=creds.token
    auth_header = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



cloud-composer/dags/sample-rideshare-run-data-quality.py [75:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
data_quality_dataset_id               = "dataplex_data_quality"
data_quality_table_name               = "data_quality_results"
DATAPLEX_PUBLIC_GCS_BUCKET_NAME       = f"dataplex-clouddq-artifacts-{dataplex_region}"
CLOUDDQ_EXECUTABLE_FILE_PATH          = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip"
CLOUDDQ_EXECUTABLE_HASHSUM_FILE_PATH  = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum" 
FULL_TARGET_TABLE_NAME                = f"{project_id}.{data_quality_dataset_id}.{data_quality_table_name}"  
TRIGGER_SPEC_TYPE                     = "ON_DEMAND"  

spark_python_script_file        = f"gs://{DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py"

# NOTE: This is case senstive for some reason
bigquery_region = bigquery_region.upper()

# https://cloud.google.com/dataplex/docs/reference/rpc/google.cloud.dataplex.v1
# https://cloud.google.com/dataplex/docs/reference/rpc/google.cloud.dataplex.v1#google.cloud.dataplex.v1.Task.InfrastructureSpec.VpcNetwork
data_quality_config = {
    "spark": {
        "python_script_file": spark_python_script_file,
        "file_uris": [CLOUDDQ_EXECUTABLE_FILE_PATH,
                      CLOUDDQ_EXECUTABLE_HASHSUM_FILE_PATH,
                      yaml_path
                      ],
        "infrastructure_spec" : {
            "vpc_network" : {
                "sub_network" : vpc_subnet_name
            }
        },                      
    },
    "execution_spec": {
        "service_account": service_account_to_run_dataplex,
        "max_job_execution_lifetime" : Duration(seconds=2*60*60),
        "args": {
            "TASK_ARGS": f"clouddq-executable.zip, \
                 ALL, \
                 {yaml_path}, \
                --gcp_project_id={project_id}, \
                --gcp_region_id={bigquery_region}, \
                --gcp_bq_dataset_id={data_quality_dataset_id}, \
                --target_bigquery_summary_table={FULL_TARGET_TABLE_NAME}"
        }
    },
    "trigger_spec": {
        "type_": TRIGGER_SPEC_TYPE
    },
    "description": "CloudDQ Airflow Task"
}


# Check on the status of the job
# Call the rest API of dataplex and then get the dataproc job and then check the status of the dataproc job
def get_clouddq_task_status(task_id):
    # Wait for job to start
    print ("get_clouddq_task_status STARTED, sleeping for 60 seconds for jobs to start")
    time.sleep(60)

    # Get auth (default service account running composer worker node)
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request() # required to acess access token
    creds.refresh(auth_req)
    access_token=creds.token
    auth_header = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



