def annotate_bq_asset_schema()

in common/data_mesh/src/data_mesh_client.py [0:0]


    def annotate_bq_asset_schema(
            self,
            bq_asset_annotation_spec: dmt.BqAssetAnnotation,
            deploy_descriptions: bool = True,
            deploy_acls: bool = True) -> Set[str]:
        """Annotate the BQ asset schema with descriptions and policy tags.

        Other existing schema fields (e.g. field type) are untouched. Doesn't
        touch any fields or their field schemas (e.g. policy tag) if they are
        not specified in the spec.

        Args:
            bq_asset_annotation_spec: data_mesh_types.BqAssetAnnotation that
                specifies a list of asset and field descriptions to annotate.
            deploy_descriptions: Bool indicating whether to deploy descriptions.
            deploy_acls: Bool indicating whether to deploy policy tags.

        Returns:
            updated_annotations: Set of strings representing annotations
                that were updated after overwrite setting is applied, e.g.
                {"table_name.description", "table_name.field_name.description"}
        """

        # Schema is always included in fields to update because we determine
        # whether an update is needed per field + annotations, e.g. field
        # description, field policy, etc.
        fields_to_update = ["schema"]
        updated_annotations = set()
        asset_id = bq_asset_annotation_spec.name

        try:
            table = self._bq_client.get_table(asset_id)
        except google_exc.NotFound:
            # TODO: collect all skipped tables into single location/msg.
            logging.warning("Asset '%s' not found, skipping annotations.",
                            asset_id)
            return set()

        # Conditionally set the table description.
        if deploy_descriptions and self._should_overwrite(
                table.description, bq_asset_annotation_spec.description):
            fields_to_update.append("description")
            table.description = bq_asset_annotation_spec.description
            updated_annotations.add(f"{asset_id}:description")

        # Update field schema.
        original_schema = table.schema
        annotation_map = {
            field.name: field for field in bq_asset_annotation_spec.fields
        }

        table.schema = self._get_updated_schema(original_schema, annotation_map,
                                                updated_annotations, asset_id,
                                                "", deploy_descriptions,
                                                deploy_acls)

        _ = self._bq_client.update_table(table,
                                         fields_to_update,
                                         retry=self._retry_options)
        return updated_annotations