in cloud-composer/dags/sample-dataplex-run-data-quality.py [0:0]
def attach_tag_template_to_columns():
client = bigquery.Client()
# This should just return a single column once (this code is not meant to handle the same column twice)
# If you have the same column twice the code will overwrite the first results. You should aggregate the
# results together or apply different tag templates per result.
query_job = client.query(f"CALL `{project_id}.{taxi_dataset_id}.sp_demo_data_quality_columns`();")
results = query_job.result() # Waits for job to complete.
for row in results:
datacatalog_client = datacatalog_v1.DataCatalogClient()
resource_name = (
f"//bigquery.googleapis.com/projects/{project_id}"
f"/datasets/{taxi_dataset_id}/tables/taxi_trips"
)
table_entry = datacatalog_client.lookup_entry(
request={"linked_resource": resource_name}
)
# Attach a Tag to the table.
tag = datacatalog_v1.types.Tag()
tag.template = f"projects/{project_id}/locations/{dataplex_region}/tagTemplates/column_dq_tag_template"
tag.name = "column_dq_tag_template"
tag.column = row.column_id
tag.fields["table_name"] = datacatalog_v1.types.TagField()
tag.fields["table_name"].string_value = "taxi_trips"
tag.fields["invocation_id"] = datacatalog_v1.types.TagField()
tag.fields["invocation_id"].string_value = row.invocation_id
tag.fields["execution_ts"] = datacatalog_v1.types.TagField()
tag.fields["execution_ts"].timestamp_value = row.execution_ts
tag.fields["column_id"] = datacatalog_v1.types.TagField()
tag.fields["column_id"].string_value = row.column_id
tag.fields["rule_binding_id"] = datacatalog_v1.types.TagField()
tag.fields["rule_binding_id"].string_value = row.rule_binding_id
tag.fields["rule_id"] = datacatalog_v1.types.TagField()
tag.fields["rule_id"].string_value = row.rule_id
tag.fields["dimension"] = datacatalog_v1.types.TagField()
tag.fields["dimension"].string_value = row.dimension
tag.fields["rows_validated"] = datacatalog_v1.types.TagField()
tag.fields["rows_validated"].double_value = row.rows_validated
tag.fields["success_count"] = datacatalog_v1.types.TagField()
tag.fields["success_count"].double_value = row.success_count
tag.fields["success_pct"] = datacatalog_v1.types.TagField()
tag.fields["success_pct"].double_value = row.success_percentage
tag.fields["failed_count"] = datacatalog_v1.types.TagField()
tag.fields["failed_count"].double_value = row.failed_count
tag.fields["failed_pct"] = datacatalog_v1.types.TagField()
tag.fields["failed_pct"].double_value = row.failed_percentage
tag.fields["null_count"] = datacatalog_v1.types.TagField()
tag.fields["null_count"].double_value = row.null_count
tag.fields["null_pct"] = datacatalog_v1.types.TagField()
tag.fields["null_pct"].double_value = row.null_percentage
# Get the existing tempates (we need to remove the existing one if it exists (we cannot have dups))
print ("attach_tag_template_to_columns table_entry.name: ", table_entry.name)
page_result = datacatalog_client.list_tags(parent=table_entry.name)
existing_name = ""
# template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/column_dq_tag_template"
""" Sample Response
name: "projects/data-analytics-demo-ra5migwp3l/locations/us/entryGroups/@bigquery/entries/cHJvamVjdHMvZGF0YS1hbmFseXRpY3MtZGVtby1yYTVtaWd3cDNsL2RhdGFzZXRzL3RheGlfZGF0YXNldC90YWJsZXMvdGF4aV90cmlwcw/tags/CVg1OS7dOJhY"
template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/column_dq_tag_template"
fields {
key: "column_id"
value {
display_name: "Column Name"
string_value: "DOLocationID"
}
}
fields {
key: "dimension"
value {
display_name: "Dimension"
string_value: "INTEGRITY"
}
}
"""
# Handle the response
for response in page_result:
print("response: ", response)
# print("response.fields[column_id]: ", response.fields["column_id"])
if (response.template == tag.template and
"column_id" in response.fields and
response.fields["column_id"].string_value == tag.column):
existing_name = response.name
print(f"existing_name: {existing_name}")
break
# This technically will rermove the same template if we are in a loop
# We should ideally have more than 1 template for different reasons since a specific template cannot be assigned more than once to a column,
# but you can assign different templates.
# One odd thing is that if you call create_tag and the template exists, it will overwrite. It errors when doing this for a table though.
if (existing_name != ""):
print(f"Delete tag: {existing_name}")
datacatalog_client.delete_tag(name=existing_name)
# https://cloud.google.com/python/docs/reference/datacatalog/latest/google.cloud.datacatalog_v1.services.data_catalog.DataCatalogClient#google_cloud_datacatalog_v1_services_data_catalog_DataCatalogClient_create_tag
tag = datacatalog_client.create_tag(parent=table_entry.name, tag=tag)
print(f"Created tag: {tag.name} on {tag.column}")