in cloud-composer/dags/sample-dataplex-run-data-quality.py [0:0]
def attach_tag_template_to_table():
client = bigquery.Client()
query_job = client.query(f"CALL `{project_id}.{taxi_dataset_id}.sp_demo_data_quality_table`();")
results = query_job.result() # Waits for job to complete.
for row in results:
# print("{} : {} views".format(row.url, row.view_count))
datacatalog_client = datacatalog_v1.DataCatalogClient()
resource_name = (
f"//bigquery.googleapis.com/projects/{project_id}"
f"/datasets/{taxi_dataset_id}/tables/taxi_trips"
)
table_entry = datacatalog_client.lookup_entry(
request={"linked_resource": resource_name}
)
# Attach a Tag to the table.
tag = datacatalog_v1.types.Tag()
tag.template = f"projects/{project_id}/locations/{dataplex_region}/tagTemplates/table_dq_tag_template"
tag.name = "table_dq_tag_template"
tag.fields["table_name"] = datacatalog_v1.types.TagField()
tag.fields["table_name"].string_value = "taxi_trips"
tag.fields["record_count"] = datacatalog_v1.types.TagField()
tag.fields["record_count"].double_value = row.record_count
tag.fields["latest_execution_ts"] = datacatalog_v1.types.TagField()
tag.fields["latest_execution_ts"].timestamp_value = row.latest_execution_ts
tag.fields["columns_validated"] = datacatalog_v1.types.TagField()
tag.fields["columns_validated"].double_value = row.columns_validated
tag.fields["columns_count"] = datacatalog_v1.types.TagField()
tag.fields["columns_count"].double_value = row.columns_count
tag.fields["success_pct"] = datacatalog_v1.types.TagField()
tag.fields["success_pct"].double_value = row.success_percentage
tag.fields["failed_pct"] = datacatalog_v1.types.TagField()
tag.fields["failed_pct"].double_value = row.failed_percentage
tag.fields["invocation_id"] = datacatalog_v1.types.TagField()
tag.fields["invocation_id"].string_value = row.invocation_id
# Get the existing tempates (we need to remove the existing one if it exists (we cannot have dups))
print ("attach_tag_template_to_table table_entry.name: ", table_entry.name)
page_result = datacatalog_client.list_tags(parent=table_entry.name)
existing_name = ""
# template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/table_dq_tag_template"
# Handle the response
for response in page_result:
print("response: ", response)
if (response.template == tag.template):
existing_name = response.name
break
# This technically will rermove the same template if we are in a loop
# We should ideally have more than 1 template for different reasons since a specific template cannot be assigned more than once to a table,
# but you can assign different templates.
if (existing_name != ""):
print(f"Delete tag: {existing_name}")
datacatalog_client.delete_tag(name=existing_name)
# https://cloud.google.com/python/docs/reference/datacatalog/latest/google.cloud.datacatalog_v1.services.data_catalog.DataCatalogClient#google_cloud_datacatalog_v1_services_data_catalog_DataCatalogClient_create_tag
tag = datacatalog_client.create_tag(parent=table_entry.name, tag=tag)
print(f"Created tag: {tag.name}")