def _get_updated_schema()

in src/common/data_mesh/src/data_mesh_client.py [0:0]


    def _get_updated_schema(
            self,
            original_schema: Sequence[bigquery.SchemaField],
            annotation_map: Dict[str, dmt.BqAssetFieldAnnotation],
            updated_annotations: Set[str],
            asset_id: str,
            parent_field: str = "",
            deploy_descriptions: bool = True,
            deploy_acls: bool = True) -> List[bigquery.SchemaField]:
        """Return an updated schema including nested fields.

        Args:
            original_schema: Bigquery sequence of SchemaFields before updates.
                This list can represent a table or nested subfields.
            annotation_map: Dict mapping full field names to annotations.
            updated_annotations: Mutated set of strings representing annotations
                that were updated after overwrite setting is applied, e.g.
                {"table_name.field_name.description"}
            asset_id: Asset ID to be returned.
            parent_field: Parent field.
            deploy_descriptions: Bool indicating whether to deploy descriptions.
            deploy_acls: Bool indicating whether to deploy policy tags.

        Returns:
            List of SchemaFields representing the updated table or subfields.
        """
        new_schema = []
        for field in original_schema:
            if parent_field:
                full_field_name = f"{parent_field}.{field.name}"
            else:
                full_field_name = field.name
            qualified_field_name = f"{asset_id}.{full_field_name}"
            field_annotation = annotation_map.get(full_field_name)

            # Warn if no annotation exists, but could still have annotations for
            # child fields.
            if not field_annotation:
                logging.warning("No annotation found for %s.",
                                qualified_field_name)

            field_repr = field.to_api_repr()

            # Conditionally set the field description.
            if (field_annotation and deploy_descriptions and
                    self._should_overwrite(field_repr.get("description"),
                                           field_annotation.description)):
                field_repr["description"] = field_annotation.description
                updated_annotations.add(f"{qualified_field_name}:description")

            # Conditionally set the policy tag.
            if field_annotation and deploy_acls and self._maybe_set_policy_tag(
                    asset_id, field_annotation, field_repr):
                updated_annotations.add(f"{qualified_field_name}:policy_tag")

            # Recursively update children
            if field.fields:
                updated_children = self._get_updated_schema(
                    field.fields, annotation_map, updated_annotations, asset_id,
                    full_field_name, deploy_descriptions, deploy_acls)
                field_repr["fields"] = [
                    c.to_api_repr() for c in updated_children
                ]

            new_field = bigquery.SchemaField.from_api_repr(field_repr)
            new_schema.append(new_field)

        return new_schema