def attach_tag_template_to_columns()

in cloud-composer/dags/sample-rideshare-run-data-quality.py [0:0]


def attach_tag_template_to_columns():
    client = bigquery.Client()
    # This should just return a single column once (this code is not meant to handle the same column twice)
    # If you have the same column twice the code will overwrite the first results.  You should aggregate the
    # results together or apply different tag templates per result.
    query_job = client.query(f"CALL `{project_id}.{rideshare_dataset_id}.sp_demo_data_quality_columns`();")
    results = query_job.result()  # Waits for job to complete.

    for row in results:
      datacatalog_client = datacatalog_v1.DataCatalogClient()

      resource_name = (
          f"//bigquery.googleapis.com/projects/{project_id}"
          f"/datasets/{rideshare_dataset_id}/tables/bigquery_rideshare_trip"
      )
      table_entry = datacatalog_client.lookup_entry(
          request={"linked_resource": resource_name}
      )

      # Attach a Tag to the table.
      tag = datacatalog_v1.types.Tag()

      tag.template = f"projects/{project_id}/locations/{dataplex_region}/tagTemplates/column_dq_tag_template"
      tag.name = "column_dq_tag_template"
      tag.column = row.column_id


      tag.fields["table_name"] = datacatalog_v1.types.TagField()
      tag.fields["table_name"].string_value = "bigquery_rideshare_trip"

      tag.fields["invocation_id"] = datacatalog_v1.types.TagField()
      tag.fields["invocation_id"].string_value = row.invocation_id

      tag.fields["execution_ts"] = datacatalog_v1.types.TagField()
      tag.fields["execution_ts"].timestamp_value = row.execution_ts

      tag.fields["column_id"] = datacatalog_v1.types.TagField()
      tag.fields["column_id"].string_value = row.column_id

      tag.fields["rule_binding_id"] = datacatalog_v1.types.TagField()
      tag.fields["rule_binding_id"].string_value = row.rule_binding_id

      tag.fields["rule_id"] = datacatalog_v1.types.TagField()
      tag.fields["rule_id"].string_value = row.rule_id

      tag.fields["dimension"] = datacatalog_v1.types.TagField()
      tag.fields["dimension"].string_value = row.dimension

      tag.fields["rows_validated"] = datacatalog_v1.types.TagField()
      tag.fields["rows_validated"].double_value = row.rows_validated

      tag.fields["success_count"] = datacatalog_v1.types.TagField()
      tag.fields["success_count"].double_value = row.success_count

      tag.fields["success_pct"] = datacatalog_v1.types.TagField()
      tag.fields["success_pct"].double_value = row.success_percentage

      tag.fields["failed_count"] = datacatalog_v1.types.TagField()
      tag.fields["failed_count"].double_value = row.failed_count

      tag.fields["failed_pct"] = datacatalog_v1.types.TagField()
      tag.fields["failed_pct"].double_value = row.failed_percentage

      tag.fields["null_count"] = datacatalog_v1.types.TagField()
      tag.fields["null_count"].double_value = row.null_count

      tag.fields["null_pct"] = datacatalog_v1.types.TagField()
      tag.fields["null_pct"].double_value = row.null_percentage

      # Get the existing tempates (we need to remove the existing one if it exists (we cannot have dups))
      print ("attach_tag_template_to_columns table_entry.name: ", table_entry.name)
      page_result = datacatalog_client.list_tags(parent=table_entry.name)

      existing_name = ""
      # template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/column_dq_tag_template"
      """ Sample Response
      name: "projects/data-analytics-demo-ra5migwp3l/locations/us/entryGroups/@bigquery/entries/cHJvamVjdHMvZGF0YS1hbmFseXRpY3MtZGVtby1yYTVtaWd3cDNsL2RhdGFzZXRzL3RheGlfZGF0YXNldC90YWJsZXMvdGF4aV90cmlwcw/tags/CVg1OS7dOJhY"
      template: "projects/data-analytics-demo-ra5migwp3l/locations/REPLACE-REGION/tagTemplates/column_dq_tag_template"
      fields {
        key: "column_id"
        value {
          display_name: "Column Name"
          string_value: "DOLocationID"
        }
      }
      fields {
        key: "dimension"
        value {
          display_name: "Dimension"
          string_value: "INTEGRITY"
        }
      }      
      """
      # Handle the response
      for response in page_result:
        print("response: ", response)
        # print("response.fields[column_id]: ", response.fields["column_id"])
        if (response.template  == tag.template and 
            "column_id" in response.fields and
            response.fields["column_id"].string_value == tag.column):
            existing_name = response.name
            print(f"existing_name: {existing_name}")
            break

      # This technically will rermove the same template if we are in a loop
      # We should ideally have more than 1 template for different reasons since a specific template cannot be assigned more than once to a column,
      # but you can assign different templates.
      # One odd thing is that if you call create_tag and the template exists, it will overwrite. It errors when doing this for a table though.
      if (existing_name != ""):
        print(f"Delete tag: {existing_name}")
        datacatalog_client.delete_tag(name=existing_name)

      # https://cloud.google.com/python/docs/reference/datacatalog/latest/google.cloud.datacatalog_v1.services.data_catalog.DataCatalogClient#google_cloud_datacatalog_v1_services_data_catalog_DataCatalogClient_create_tag
      tag = datacatalog_client.create_tag(parent=table_entry.name, tag=tag)
      print(f"Created tag: {tag.name} on {tag.column}")