def attach_tag_template_to_table()

in cloud-composer/dags/sample-dataplex-run-data-quality.py [0:0]


def attach_tag_template_to_table():
    client = bigquery.Client()
    query_job = client.query(f"CALL `{project_id}.{taxi_dataset_id}.sp_demo_data_quality_table`();")
    results = query_job.result()  # Waits for job to complete.

    for row in results:
      # print("{} : {} views".format(row.url, row.view_count))
      datacatalog_client = datacatalog_v1.DataCatalogClient()

      resource_name = (
          f"//bigquery.googleapis.com/projects/{project_id}"
          f"/datasets/{taxi_dataset_id}/tables/taxi_trips"
      )
      table_entry = datacatalog_client.lookup_entry(
          request={"linked_resource": resource_name}
      )

      # Attach a Tag to the table.
      tag = datacatalog_v1.types.Tag()

      tag.template = f"projects/{project_id}/locations/{dataplex_region}/tagTemplates/table_dq_tag_template"
      tag.name = "table_dq_tag_template"

      tag.fields["table_name"] = datacatalog_v1.types.TagField()
      tag.fields["table_name"].string_value = "taxi_trips"

      tag.fields["record_count"] = datacatalog_v1.types.TagField()
      tag.fields["record_count"].double_value = row.record_count

      tag.fields["latest_execution_ts"] = datacatalog_v1.types.TagField()
      tag.fields["latest_execution_ts"].timestamp_value = row.latest_execution_ts

      tag.fields["columns_validated"] = datacatalog_v1.types.TagField()
      tag.fields["columns_validated"].double_value = row.columns_validated

      tag.fields["columns_count"] = datacatalog_v1.types.TagField()
      tag.fields["columns_count"].double_value = row.columns_count

      tag.fields["success_pct"] = datacatalog_v1.types.TagField()
      tag.fields["success_pct"].double_value = row.success_percentage

      tag.fields["failed_pct"] = datacatalog_v1.types.TagField()
      tag.fields["failed_pct"].double_value = row.failed_percentage

      tag.fields["invocation_id"] = datacatalog_v1.types.TagField()
      tag.fields["invocation_id"].string_value = row.invocation_id

      # Get the existing tempates (we need to remove the existing one if it exists (we cannot have dups))
      print ("attach_tag_template_to_table table_entry.name: ", table_entry.name)
      page_result = datacatalog_client.list_tags(parent=table_entry.name)

      existing_name = ""
      # template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/table_dq_tag_template"
      # Handle the response
      for response in page_result:
        print("response: ", response)
        if (response.template == tag.template):
            existing_name = response.name
            break

      # This technically will rermove the same template if we are in a loop
      # We should ideally have more than 1 template for different reasons since a specific template cannot be assigned more than once to a table,
      # but you can assign different templates.
      if (existing_name != ""):
        print(f"Delete tag: {existing_name}")
        datacatalog_client.delete_tag(name=existing_name)

      # https://cloud.google.com/python/docs/reference/datacatalog/latest/google.cloud.datacatalog_v1.services.data_catalog.DataCatalogClient#google_cloud_datacatalog_v1_services_data_catalog_DataCatalogClient_create_tag
      tag = datacatalog_client.create_tag(parent=table_entry.name, tag=tag)
      print(f"Created tag: {tag.name}")