datascan/python-api-sample-scripts/dq_get_job_results.py (73 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import dataplex_v1
import argparse
from google.protobuf.json_format import MessageToJson
parser = argparse.ArgumentParser()
parser.add_argument('--datascan_name', '-dsn', help='Full project path of the Datascan. Allowed format = "projects/{PROJECT-ID}/locations/{REGION}/dataScans/{DATASCAN-NAME}"')
args = parser.parse_args()
def get_job_results():
# Create a Dataplex client object
print("Authenticating Dataplex Client...")
client = dataplex_v1.DataScanServiceClient()
# Set a GetDataScanRequest() using the args.datascan_name argument.
datascan_name = args.datascan_name
request_scan = dataplex_v1.GetDataScanRequest(name=datascan_name)
response_scan = client.get_data_scan(request=request_scan)
# print("RESPONSE_SCAN-->")
# print(response_scan)
dq_scan_name = response_scan.name
parse_data_scan_path = client.parse_data_scan_path(datascan_name)
dq_project = parse_data_scan_path.get('project')
dq_scan_id = parse_data_scan_path.get('dataScan')
table_reference = response_scan.data.resource
if table_reference == '':
table_reference = response_scan.data.entity
dq_table = table_reference.split("/")[-1]
request = dataplex_v1.ListDataScanJobsRequest(
parent=datascan_name,
page_size=10)
# optional: filter, page_size
page_result = client.list_data_scan_jobs(request=request)
counter = 0
job_names = []
for response in page_result:
counter += 1
job_names.append(response.name)
print('Jobs scanned: ' + str(counter))
# Read Jobs data
for job_name in job_names:
job_request = dataplex_v1.GetDataScanJobRequest(
name=job_name,
view="FULL",
)
job_result = client.get_data_scan_job(request=job_request)
# Skips jobs if not in succeeded state
if job_result.state != 4:
continue
dq_job_id = job_result.uid
passing_rules = 0
failing_rules = 0
for rule in job_result.data_quality_result.rules:
if rule.passed is True:
passing_rules += 1
elif rule.passed is False:
failing_rules += 1
print(' -->Passing rules = ' + str(passing_rules))
print(' -->Failing rules = ' + str(failing_rules))
print("dq_scan_name --> " + dq_scan_name)
print("dq_scan_id --> " + dq_scan_id)
print("dq_table --> " + dq_table)
print("dq_project --> " + dq_project)
print("dq_job_id --> " + dq_job_id)
print("job_result.data_quality_result.row_count --> " + str(job_result.data_quality_result.row_count))
print("job_result.data_quality_result.passed --> " + str(job_result.data_quality_result.passed))
print("len(job_result.data_quality_result.rules) --> " + str(len(job_result.data_quality_result.rules)))
print("passing_rules --> " + str(passing_rules))
print("job_result.start_time --> " + str(job_result.start_time))
print("job_result.end_time --> " + str(job_result.end_time))
print("MessageToJson(job_result.data_quality_result.scanned_data._pb) --> " + MessageToJson(job_result.data_quality_result.scanned_data._pb))
print("MessageToJson(job_result.data_quality_result._pb) --> " + MessageToJson(job_result.data_quality_result._pb))
for rule_result in job_result.data_quality_result.rules:
MessageToJson(rule_result.rule._pb)
rule_result.rule.dimension
rule_result.passed
rule_result.pass_ratio
rule_result.failing_rows_query
print("MessageToJson(rule_result.rule._pb) --> " + MessageToJson(rule_result.rule._pb))
print("rule_result.rule.dimension --> " + rule_result.rule.dimension)
print("rule_result.passed --> " + str(rule_result.passed))
print("rule_result.pass_ratio --> " + str(rule_result.pass_ratio))
print("rule_result.failing_rows_query --> " + rule_result.failing_rows_query)
get_job_results()