in common/data_mesh/src/data_mesh_client.py [0:0]
def annotate_bq_asset_schema(
self,
bq_asset_annotation_spec: dmt.BqAssetAnnotation,
deploy_descriptions: bool = True,
deploy_acls: bool = True) -> Set[str]:
"""Annotate the BQ asset schema with descriptions and policy tags.
Other existing schema fields (e.g. field type) are untouched. Doesn't
touch any fields or their field schemas (e.g. policy tag) if they are
not specified in the spec.
Args:
bq_asset_annotation_spec: data_mesh_types.BqAssetAnnotation that
specifies a list of asset and field descriptions to annotate.
deploy_descriptions: Bool indicating whether to deploy descriptions.
deploy_acls: Bool indicating whether to deploy policy tags.
Returns:
updated_annotations: Set of strings representing annotations
that were updated after overwrite setting is applied, e.g.
{"table_name.description", "table_name.field_name.description"}
"""
# Schema is always included in fields to update because we determine
# whether an update is needed per field + annotations, e.g. field
# description, field policy, etc.
fields_to_update = ["schema"]
updated_annotations = set()
asset_id = bq_asset_annotation_spec.name
try:
table = self._bq_client.get_table(asset_id)
except google_exc.NotFound:
# TODO: collect all skipped tables into single location/msg.
logging.warning("Asset '%s' not found, skipping annotations.",
asset_id)
return set()
# Conditionally set the table description.
if deploy_descriptions and self._should_overwrite(
table.description, bq_asset_annotation_spec.description):
fields_to_update.append("description")
table.description = bq_asset_annotation_spec.description
updated_annotations.add(f"{asset_id}:description")
# Update field schema.
original_schema = table.schema
annotation_map = {
field.name: field for field in bq_asset_annotation_spec.fields
}
table.schema = self._get_updated_schema(original_schema, annotation_map,
updated_annotations, asset_id,
"", deploy_descriptions,
deploy_acls)
_ = self._bq_client.update_table(table,
fields_to_update,
retry=self._retry_options)
return updated_annotations