def from_file()

in bigquery_etl/metadata/parse_metadata.py [0:0]


    def from_file(cls, metadata_file):
        """Parse metadata from the provided file and create a new Metadata instance."""
        friendly_name = None
        description = None
        owners = []
        labels = {}
        scheduling = {}
        bigquery = None
        schema = None
        workgroup_access = None
        references = {}
        external_data = None
        deprecated = False
        deletion_date = None
        monitoring = None
        require_column_descriptions = False

        with open(metadata_file, "r") as yaml_stream:
            try:
                metadata = yaml.safe_load(yaml_stream)
                table_name = str(Path(metadata_file).parent.name)
                friendly_name = metadata.get(
                    "friendly_name", string.capwords(table_name.replace("_", " "))
                )
                description = metadata.get(
                    "description",
                    "Please provide a description for the query",
                )

                if "labels" in metadata:
                    for key, label in metadata["labels"].items():
                        if isinstance(label, bool):
                            # publish key-value pair with bool value as tag
                            if label:
                                labels[str(key)] = ""
                        elif isinstance(label, list):
                            labels[str(key)] = list(map(str, label))
                        else:
                            # all other pairs get published as key-value pair label
                            labels[str(key)] = str(label)

                if "scheduling" in metadata:
                    scheduling = metadata["scheduling"]
                    if "dag_name" in scheduling and cls.is_valid_label(
                        scheduling["dag_name"]
                    ):
                        labels["dag"] = scheduling["dag_name"]

                if "bigquery" in metadata and metadata["bigquery"]:
                    converter = cattrs.BaseConverter()
                    bigquery = converter.structure(
                        metadata["bigquery"], BigQueryMetadata
                    )

                if "owners" in metadata:
                    owners = metadata["owners"]
                    owner_idx = 1
                    for owner in filter(is_email, owners):
                        label = owner.split("@")[0]
                        if Metadata.is_valid_label(label):
                            labels[f"owner{owner_idx}"] = label
                            owner_idx += 1

                if "schema" in metadata:
                    converter = cattrs.BaseConverter()
                    schema = converter.structure(metadata["schema"], SchemaMetadata)

                if "workgroup_access" in metadata:
                    converter = cattrs.BaseConverter()
                    workgroup_access = converter.structure(
                        metadata["workgroup_access"], List[WorkgroupAccessMetadata]
                    )

                if "references" in metadata:
                    references = metadata["references"]

                if "external_data" in metadata:
                    converter = cattrs.BaseConverter()
                    external_data = converter.structure(
                        metadata["external_data"], ExternalDataMetadata
                    )
                if "deprecated" in metadata:
                    deprecated = metadata["deprecated"]
                if "deletion_date" in metadata:
                    deletion_date = metadata["deletion_date"]

                if "monitoring" in metadata:
                    converter = cattrs.BaseConverter()
                    monitoring = converter.structure(
                        metadata["monitoring"], MonitoringMetadata
                    )

                    if "partition_column" in metadata["monitoring"]:
                        # check if partition column metadata has been set explicitly;
                        # needed for monitoring config validation for views where partition
                        # column needs to be set explicitly
                        monitoring.partition_column_set = True

                if "require_column_descriptions" in metadata:
                    require_column_descriptions = metadata[
                        "require_column_descriptions"
                    ]

                return cls(
                    friendly_name,
                    description,
                    owners,
                    labels,
                    scheduling,
                    bigquery,
                    schema,
                    workgroup_access,
                    references,
                    external_data,
                    deprecated,
                    deletion_date,
                    monitoring,
                    require_column_descriptions,
                )
            except yaml.YAMLError as e:
                raise e