in bigquery_etl/metadata/parse_metadata.py [0:0]
def from_file(cls, metadata_file):
"""Parse metadata from the provided file and create a new Metadata instance."""
friendly_name = None
description = None
owners = []
labels = {}
scheduling = {}
bigquery = None
schema = None
workgroup_access = None
references = {}
external_data = None
deprecated = False
deletion_date = None
monitoring = None
require_column_descriptions = False
with open(metadata_file, "r") as yaml_stream:
try:
metadata = yaml.safe_load(yaml_stream)
table_name = str(Path(metadata_file).parent.name)
friendly_name = metadata.get(
"friendly_name", string.capwords(table_name.replace("_", " "))
)
description = metadata.get(
"description",
"Please provide a description for the query",
)
if "labels" in metadata:
for key, label in metadata["labels"].items():
if isinstance(label, bool):
# publish key-value pair with bool value as tag
if label:
labels[str(key)] = ""
elif isinstance(label, list):
labels[str(key)] = list(map(str, label))
else:
# all other pairs get published as key-value pair label
labels[str(key)] = str(label)
if "scheduling" in metadata:
scheduling = metadata["scheduling"]
if "dag_name" in scheduling and cls.is_valid_label(
scheduling["dag_name"]
):
labels["dag"] = scheduling["dag_name"]
if "bigquery" in metadata and metadata["bigquery"]:
converter = cattrs.BaseConverter()
bigquery = converter.structure(
metadata["bigquery"], BigQueryMetadata
)
if "owners" in metadata:
owners = metadata["owners"]
owner_idx = 1
for owner in filter(is_email, owners):
label = owner.split("@")[0]
if Metadata.is_valid_label(label):
labels[f"owner{owner_idx}"] = label
owner_idx += 1
if "schema" in metadata:
converter = cattrs.BaseConverter()
schema = converter.structure(metadata["schema"], SchemaMetadata)
if "workgroup_access" in metadata:
converter = cattrs.BaseConverter()
workgroup_access = converter.structure(
metadata["workgroup_access"], List[WorkgroupAccessMetadata]
)
if "references" in metadata:
references = metadata["references"]
if "external_data" in metadata:
converter = cattrs.BaseConverter()
external_data = converter.structure(
metadata["external_data"], ExternalDataMetadata
)
if "deprecated" in metadata:
deprecated = metadata["deprecated"]
if "deletion_date" in metadata:
deletion_date = metadata["deletion_date"]
if "monitoring" in metadata:
converter = cattrs.BaseConverter()
monitoring = converter.structure(
metadata["monitoring"], MonitoringMetadata
)
if "partition_column" in metadata["monitoring"]:
# check if partition column metadata has been set explicitly;
# needed for monitoring config validation for views where partition
# column needs to be set explicitly
monitoring.partition_column_set = True
if "require_column_descriptions" in metadata:
require_column_descriptions = metadata[
"require_column_descriptions"
]
return cls(
friendly_name,
description,
owners,
labels,
scheduling,
bigquery,
schema,
workgroup_access,
references,
external_data,
deprecated,
deletion_date,
monitoring,
require_column_descriptions,
)
except yaml.YAMLError as e:
raise e