datascan/python-api-sample-scripts/dq_create_scan.py (93 lines of code) (raw):

#!/usr/bin/env python # Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # !pip install google-cloud # !pip install google-cloud-dataplex # !pip install google-cloud-storage import argparse from google.cloud import dataplex_v1 # Arguments: parser = argparse.ArgumentParser() # data_scan_id parser.add_argument('--data_scan_id', required=True, help='ID for the new DQ Scan. Must be unique. Only letters, numbers, and dashes allowed. Cannot be changed after creation. See: https://cloud.google.com/compute/docs/naming-resources#resource-name-format') # project_location / "parent" parser.add_argument('--dq_project', required=True, help='The project where the Dataplex Data Quality Scan will reside. Allowed format: projects/{PROJECT-ID}/locations/{REGION-ID}. Example: projects/abc-xyz/locations/us-central1. Region refers to a GCP region.') # dataset_table resource parser.add_argument('--data_source_path', required=True, help='Full project path of the source table. Allowed format: //bigquery.googleapis.com/projects/{PROJECT-ID}/datasets/{DATASET-ID}/tables/{TABLE}') # bq_export table parser.add_argument('--export_results_path', required=True, help='Full project path of the table the dq results are exported to. Allowed format: //bigquery.googleapis.com/projects/{PROJECT-ID}/datasets/{DATASET-ID}/tables/{TABLE}') # display_name (optional) parser.add_argument('--display_name', required=False, help='Display name for the DQ Scan. This field is optional.') args = parser.parse_args() def create_data_scan(): ''' Given a list of data quality rules, create a new DataScan resource and populate it with the given rules. Args: data_source_path - Full project path of the source table. Allowed format: //bigquery.googleapis.com/projects/{PROJECT-ID}/datasets/{DATASET-ID}/tables/{TABLE} dq_project - The project where the Dataplex Data Quality Scan will reside. Allowed format: projects/{PROJECT-ID}/locations/{REGION-ID}. Example: projects/abc-xyz/locations/us-central1. Region refers to a GCP region. data_scan_id - ID for the new DQ Scan. Must be unique. Only letters, numbers, and dashes allowed. Cannot be changed after creation. See: https://cloud.google.com/compute/docs/naming-resources#resource-name-format export_results_path - Full project path of the table the dq results are exported to. Allowed format: //bigquery.googleapis.com/projects/{PROJECT-ID}/datasets/{DATASET-ID}/tables/{TABLE} Returns: The created DataScan resource in your Dataplex project. ''' # Create a Dataplex client object print("Authenticating Dataplex Client...") dataplex_client = dataplex_v1.DataScanServiceClient() # Define DQ Rules: ### SAMPLE RULES ### rules = [ { # Rule 1 "column": "category", "dimension": "VALIDITY", "ignore_null": False, "set_expectation": { "values": [ "Office Supplies", "Technology", "Furniture" ] }, "threshold": 0.98 # Value between 0 and 1 }, { # Rule 2 "column": "order_id", "dimension": "COMPLETENESS", "non_null_expectation": {}, "threshold": 0.98 }, { # Rule 3 "column": "sales", "dimension": "ACCURACY", "row_condition_expectation": { "sql_expression": "sales > 0" }, "threshold": 0.90 }, { # Rule 4 "column": "country", "dimension": "UNIQUENESS", "uniqueness_expectation": {}, "ignore_null": True, "threshold": 0.86 }, ] # Define a DataQualitySpec() print("Creating DataQualitySpec...") dq_spec = dataplex_v1.DataQualitySpec() dq_spec.rules = rules dq_spec.post_scan_actions = { # This writes the results of the DQ Scan to a table in BQ. Creates the table if it does not exist. "bigquery_export": { "results_table": args.export_results_path } } # Define a DataScan() # resource (str) # data_quality_spec = DataQualitySpec() print("Creating DataScan...") data_scan = dataplex_v1.DataScan() # DATA is one-of `resource` or `entity` data_scan.data.resource = args.data_source_path # format: "//bigquery.googleapis.com/projects/{PROJECT-ID}/datasets/{DATASET}/tables/{TABLE}" # data_scan.data.entity = args.data_source_path # format: "projects/{PROJECT-ID}/locations/{REGION}/lakes/{LAKE}/zones/{ZONE}/entities/{ASSET}" # data_scan.display_name = args.display_name # Optional field data_scan.labels = None # Optional field data_scan.data_quality_spec = dq_spec data_scan.execution_spec = { # Value of the trigger is either "on_demand" or "schedule". If not specified, the default is `on_demand`. # If `field` is set, indicates an "incremental" scan. "trigger": { # Value is "on_demand" or "schedule" "on_demand": {} # "schedule": { # "cron": "0 0 3 * *" # } } # , # "field": "incremental_date" # Optional field } # Define a CreateDataScanRequest() # parent (str) # data_scan = DataScan() # data_scan_id (str) print("Creating a CreateDataScanRequest...") create_scan_request = dataplex_v1.CreateDataScanRequest() create_scan_request.parent = args.dq_project # format: "projects/{PROJECT-ID/locations/{REGION}" create_scan_request.data_scan = data_scan create_scan_request.data_scan_id = args.data_scan_id # Make the scan request print("Creating AutoDQ Scan operation...") create_scan_operation = dataplex_client.create_data_scan(request=create_scan_request) # Handle the response create_scan_response = create_scan_operation.result() print("Successfully created Scan. Here's the output response:") print(create_scan_response) print("SUCCESS! DQ Scan created successfully.") create_data_scan()