def _maybe_set_policy_tag()

in common/data_mesh/src/data_mesh_client.py [0:0]


    def _maybe_set_policy_tag(self, asset_id: str,
                              field_spec: dmt.BqAssetFieldAnnotation,
                              field_repr: Dict[str, Any]) -> bool:
        """Return a bool indicating if the policy tag was set.

        This method sets the policy_tag by mutating the field_repr argument.
        """
        policy_tag_id = field_spec.policy_tag_id
        existing_policy_tag = field_repr.get("policyTags") or {}
        existing_policy_tag_names = existing_policy_tag.get("names")
        existing_policy_tag_id = None

        if existing_policy_tag_names:
            existing_policy_tag_name = existing_policy_tag_names[0]
            name_parts = existing_policy_tag_name.split("/")
            existing_taxonomy_name = "/".join(name_parts[:-2])
            try:
                existing_policy_tag_id = dmt.PolicyTagId(
                    self._policy_tag_client.get_policy_tag(
                        name=existing_policy_tag_name).display_name,
                    self._policy_tag_client.get_taxonomy(
                        name=existing_taxonomy_name).display_name)
            except google_exc.NotFound as found_error:
                # If the policy taxonomy isn't found, consider it not to exist.
                # This can happen if the taxonomy was recently deleted and
                # reference deletes haven't yet had time to propagate. This
                # occurs when overwriting. In those cases we ignore the
                # exception and just overwrite obsolete policies.
                details_str = "; ".join([str(d) for d in found_error.details])
                if not re.search(r"Requested taxonomy id of \w+ was not found",
                                 details_str):
                    raise

        if self._should_overwrite(existing_policy_tag_id,
                                  policy_tag_id,
                                  enable_delete=True):
            if policy_tag_id:
                project = asset_id.split(".")[0]
                policy_taxonomy_name = self._get_resource_name(
                    dmt.PolicyTaxonomy, policy_tag_id.taxonomy,
                    self._parent_project(project))
                policy_tag_name = self._get_resource_name(
                    dmt.PolicyTag, policy_tag_id.display_name,
                    policy_taxonomy_name)
                # Oddly the schema expects a list of policy tags even though
                # only 1 can be assigned to the column.
                field_repr["policyTags"] = {"names": [policy_tag_name]}
                policy_tag_id_str = ":".join(
                    [policy_tag_id.taxonomy, policy_tag_id.display_name])
            else:
                field_repr["policyTags"] = {"names": []}
                policy_tag_id_str = "None"

            existing_policy_tag_id_str = "None"
            if existing_policy_tag_id:
                existing_policy_tag_id_str = ":".join([
                    existing_policy_tag_id.taxonomy,
                    existing_policy_tag_id.display_name
                ])

            logging.info("Setting %s.%s policy tag from %s to %s", asset_id,
                         field_spec.name, existing_policy_tag_id_str,
                         policy_tag_id_str)
            return True
        return False