def create_catalog_tags()

in common/data_mesh/src/data_mesh_client.py [0:0]


    def create_catalog_tags(self,
                            bq_asset_annotation_spec: dmt.BqAssetAnnotation,
                            templates_spec: dmt.CatalogTagTemplates) -> None:
        """Create and annotate the catalog tags in the specified BQ asset.

        This creates the catalog tags at both the asset and field levels.

        Args:
            bq_asset_annotation_spec: data_mesh_types.BqAssetAnnotation that
                specifies a list of asset and field catalog tags to create.
            templates_spec: data_mesh_types.CatalogTagTemplates that specifies
                a list of templates that include all those referenced in the
                BqAssetAnnotation.
        """
        project, dataset, table = bq_asset_annotation_spec.name.split(".")
        entry_name = get_bq_entry_name(project, dataset, table)

        try:
            table_entry = self._catalog_client.lookup_entry(
                request={"linked_resource": entry_name})
        except google_exc.PermissionDenied:
            # TODO: collect all skipped tables into single location/msg.
            logging.warning(
                "Permission denied on asset '%s' or it does not exist, "
                "skipping catalog tags.", bq_asset_annotation_spec.name)
            return

        templates_spec_map = {
            template.display_name: template
            for template in templates_spec.templates
        }

        # This will include catalog tags at the asset and field levels.
        existing_catalog_tags = self._list_resources(dmt.CatalogTag,
                                                     table_entry.name)
        # TODO: check if tag is used at the wrong level (field vs asset).
        for asset_tag in bq_asset_annotation_spec.catalog_tags:
            template_spec = templates_spec_map[asset_tag.display_name]
            self.create_catalog_tag(project,
                                    asset_tag,
                                    template_spec,
                                    table_entry.name,
                                    existing_catalog_tags=existing_catalog_tags)

        for field in bq_asset_annotation_spec.fields:
            for field_tag in field.catalog_tags:
                template_spec = templates_spec_map[field_tag.display_name]
                self.create_catalog_tag(
                    project,
                    field_tag,
                    template_spec,
                    table_entry.name,
                    column=field.name,
                    existing_catalog_tags=existing_catalog_tags)