common/py_libs/schema_reader.py (29 lines of code) (raw):

# Copyright 2024 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License foSr the specific language governing permissions and # limitations under the License. """Utility functions for working with BQ Tables and mapping files.""" import csv import logging from pathlib import Path from google.cloud.bigquery import SchemaField def read_field_type_mapping(mapping_file: Path, schema_target_field: str, system_fields: dict[str, str], schema_bq_datatype_field = None) -> dict: """Read field name to target data type mappings from file. Args: mapping_file (Path): Mapping file location. schema_target_field (str): Name of the column in mapping file with target field names. system_fields (dict[str, str]): Dict with system field names and datatypes. schema_bq_datatype_field: Name of the column in mapping file with target datatypes. Returns: Dict: Mapping dictionary where keys are the target field names and the values are the target data types. """ field_mapping = {} with open(mapping_file, encoding="utf-8", newline="") as f: for row in csv.DictReader(f, delimiter=","): # All input values in raw layer schema are string. if schema_bq_datatype_field: field_type = row[schema_bq_datatype_field] else: field_type = "STRING" field_mapping[row[schema_target_field]] = field_type field_mapping = {**field_mapping, **system_fields} return field_mapping def read_bq_schema(mapping_file: Path, schema_target_field: str, system_fields: dict[str, str], schema_bq_datatype_field = None) -> list[SchemaField]: """Reads BQ Schema from file and adds additional fields. Args: mapping_file (Path): Schema mapping file path. schema_target_field (str): Name of the column in mapping file with target field names. system_fields (dict[str, str]): Dict with system field names and datatypes. schema_bq_datatype_field: Name of the column in mapping file with target datatypes. Return: BQ schema as a list of fields and datatypes. """ bq_schema = [] field_mapping = read_field_type_mapping(mapping_file, schema_target_field, system_fields, schema_bq_datatype_field) for column_name, column_type in field_mapping.items(): bq_schema.append(SchemaField(name=column_name, field_type=column_type)) logging.debug("\n".join([repr(field) for field in bq_schema])) return bq_schema