in common/data_mesh/src/data_mesh_client.py [0:0]
def create_catalog_tags(self,
bq_asset_annotation_spec: dmt.BqAssetAnnotation,
templates_spec: dmt.CatalogTagTemplates) -> None:
"""Create and annotate the catalog tags in the specified BQ asset.
This creates the catalog tags at both the asset and field levels.
Args:
bq_asset_annotation_spec: data_mesh_types.BqAssetAnnotation that
specifies a list of asset and field catalog tags to create.
templates_spec: data_mesh_types.CatalogTagTemplates that specifies
a list of templates that include all those referenced in the
BqAssetAnnotation.
"""
project, dataset, table = bq_asset_annotation_spec.name.split(".")
entry_name = get_bq_entry_name(project, dataset, table)
try:
table_entry = self._catalog_client.lookup_entry(
request={"linked_resource": entry_name})
except google_exc.PermissionDenied:
# TODO: collect all skipped tables into single location/msg.
logging.warning(
"Permission denied on asset '%s' or it does not exist, "
"skipping catalog tags.", bq_asset_annotation_spec.name)
return
templates_spec_map = {
template.display_name: template
for template in templates_spec.templates
}
# This will include catalog tags at the asset and field levels.
existing_catalog_tags = self._list_resources(dmt.CatalogTag,
table_entry.name)
# TODO: check if tag is used at the wrong level (field vs asset).
for asset_tag in bq_asset_annotation_spec.catalog_tags:
template_spec = templates_spec_map[asset_tag.display_name]
self.create_catalog_tag(project,
asset_tag,
template_spec,
table_entry.name,
existing_catalog_tags=existing_catalog_tags)
for field in bq_asset_annotation_spec.fields:
for field_tag in field.catalog_tags:
template_spec = templates_spec_map[field_tag.display_name]
self.create_catalog_tag(
project,
field_tag,
template_spec,
table_entry.name,
column=field.name,
existing_catalog_tags=existing_catalog_tags)