in common/data_mesh/src/data_mesh_client.py [0:0]
def _get_matching_asset_policies(
self, asset_catalog_tags: List[dmt.CatalogTag],
templates_spec: dmt.CatalogTagTemplates) -> List[dmt.BqAssetPolicy]:
templates_spec_map = {
template.display_name: template
for template in templates_spec.templates
}
asset_policies = []
for catalog_tag in asset_catalog_tags:
template_spec = templates_spec_map.get(catalog_tag.display_name)
if not template_spec:
raise cortex_exc.NotFoundCError(
"No matching template found for CatalogTag: "
f"'{catalog_tag.display_name}'.")
for asset_policy in template_spec.asset_policies:
if self._is_matching_policy(catalog_tag, template_spec,
asset_policy.filters):
asset_policies.append(asset_policy.policy)
return asset_policies