in common/data_mesh/src/data_mesh_client.py [0:0]
def _maybe_set_policy_tag(self, asset_id: str,
field_spec: dmt.BqAssetFieldAnnotation,
field_repr: Dict[str, Any]) -> bool:
"""Return a bool indicating if the policy tag was set.
This method sets the policy_tag by mutating the field_repr argument.
"""
policy_tag_id = field_spec.policy_tag_id
existing_policy_tag = field_repr.get("policyTags") or {}
existing_policy_tag_names = existing_policy_tag.get("names")
existing_policy_tag_id = None
if existing_policy_tag_names:
existing_policy_tag_name = existing_policy_tag_names[0]
name_parts = existing_policy_tag_name.split("/")
existing_taxonomy_name = "/".join(name_parts[:-2])
try:
existing_policy_tag_id = dmt.PolicyTagId(
self._policy_tag_client.get_policy_tag(
name=existing_policy_tag_name).display_name,
self._policy_tag_client.get_taxonomy(
name=existing_taxonomy_name).display_name)
except google_exc.NotFound as found_error:
# If the policy taxonomy isn't found, consider it not to exist.
# This can happen if the taxonomy was recently deleted and
# reference deletes haven't yet had time to propagate. This
# occurs when overwriting. In those cases we ignore the
# exception and just overwrite obsolete policies.
details_str = "; ".join([str(d) for d in found_error.details])
if not re.search(r"Requested taxonomy id of \w+ was not found",
details_str):
raise
if self._should_overwrite(existing_policy_tag_id,
policy_tag_id,
enable_delete=True):
if policy_tag_id:
project = asset_id.split(".")[0]
policy_taxonomy_name = self._get_resource_name(
dmt.PolicyTaxonomy, policy_tag_id.taxonomy,
self._parent_project(project))
policy_tag_name = self._get_resource_name(
dmt.PolicyTag, policy_tag_id.display_name,
policy_taxonomy_name)
# Oddly the schema expects a list of policy tags even though
# only 1 can be assigned to the column.
field_repr["policyTags"] = {"names": [policy_tag_name]}
policy_tag_id_str = ":".join(
[policy_tag_id.taxonomy, policy_tag_id.display_name])
else:
field_repr["policyTags"] = {"names": []}
policy_tag_id_str = "None"
existing_policy_tag_id_str = "None"
if existing_policy_tag_id:
existing_policy_tag_id_str = ":".join([
existing_policy_tag_id.taxonomy,
existing_policy_tag_id.display_name
])
logging.info("Setting %s.%s policy tag from %s to %s", asset_id,
field_spec.name, existing_policy_tag_id_str,
policy_tag_id_str)
return True
return False