in src/common/data_mesh/src/data_mesh_client.py [0:0]
def _get_updated_schema(
self,
original_schema: Sequence[bigquery.SchemaField],
annotation_map: Dict[str, dmt.BqAssetFieldAnnotation],
updated_annotations: Set[str],
asset_id: str,
parent_field: str = "",
deploy_descriptions: bool = True,
deploy_acls: bool = True) -> List[bigquery.SchemaField]:
"""Return an updated schema including nested fields.
Args:
original_schema: Bigquery sequence of SchemaFields before updates.
This list can represent a table or nested subfields.
annotation_map: Dict mapping full field names to annotations.
updated_annotations: Mutated set of strings representing annotations
that were updated after overwrite setting is applied, e.g.
{"table_name.field_name.description"}
asset_id: Asset ID to be returned.
parent_field: Parent field.
deploy_descriptions: Bool indicating whether to deploy descriptions.
deploy_acls: Bool indicating whether to deploy policy tags.
Returns:
List of SchemaFields representing the updated table or subfields.
"""
new_schema = []
for field in original_schema:
if parent_field:
full_field_name = f"{parent_field}.{field.name}"
else:
full_field_name = field.name
qualified_field_name = f"{asset_id}.{full_field_name}"
field_annotation = annotation_map.get(full_field_name)
# Warn if no annotation exists, but could still have annotations for
# child fields.
if not field_annotation:
logging.warning("No annotation found for %s.",
qualified_field_name)
field_repr = field.to_api_repr()
# Conditionally set the field description.
if (field_annotation and deploy_descriptions and
self._should_overwrite(field_repr.get("description"),
field_annotation.description)):
field_repr["description"] = field_annotation.description
updated_annotations.add(f"{qualified_field_name}:description")
# Conditionally set the policy tag.
if field_annotation and deploy_acls and self._maybe_set_policy_tag(
asset_id, field_annotation, field_repr):
updated_annotations.add(f"{qualified_field_name}:policy_tag")
# Recursively update children
if field.fields:
updated_children = self._get_updated_schema(
field.fields, annotation_map, updated_annotations, asset_id,
full_field_name, deploy_descriptions, deploy_acls)
field_repr["fields"] = [
c.to_api_repr() for c in updated_children
]
new_field = bigquery.SchemaField.from_api_repr(field_repr)
new_schema.append(new_field)
return new_schema