common/big_query/schema_provider.py (152 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This module provides a SchemaProvider class for managing and retrieving schema information for BigQuery tables. It includes predefined schemas for specific tables and methods to access these schemas. Classes: - SchemaProvider: A class for managing and retrieving schema information for BigQuery tables. """ from typing import Dict, Any from google.cloud import bigquery from common.big_query.big_query_exceptions import BigQuerySchemaNotFoundError class SchemaProvider: """ A provider class for managing and retrieving schema information for BigQuery tables. """ def __init__(self): """ Initializes the SchemaProvider with predefined schemas for specific tables. """ self.tables = { "tag_templates": { "schema": [ bigquery.SchemaField( name="resourceName", field_type="STRING", mode="REQUIRED", description=( "Format: projects/:project/locations/" ":location/tagTemplates/:tagTemplateId" ), ), bigquery.SchemaField( name="dataplexResourceName", field_type="STRING", description=( "Format: projects/:project/locations/" "global/aspectTypes/:aspectTypeId" ), ), bigquery.SchemaField( name="projectId", field_type="STRING", mode="REQUIRED" ), bigquery.SchemaField( name="location", field_type="STRING", mode="REQUIRED" ), bigquery.SchemaField( name="tagTemplateId", field_type="STRING", mode="REQUIRED", ), bigquery.SchemaField( name="managingSystem", field_type="STRING", mode="REQUIRED", description="Either DATA_CATALOG or DATAPLEX", ), bigquery.SchemaField( name="isPubliclyReadable", field_type="BOOL" ), bigquery.SchemaField( name="createdAt", field_type="DATE", mode="REQUIRED" ), ], "is_partitioned": True, "partition_column": "createdAt", "require_partition_filter": True, }, "entry_groups": { "schema": [ bigquery.SchemaField( name="resourceName", field_type="STRING", mode="REQUIRED", description=( "Format: projects/:project/locations/" ":location/entryGroups/:entryGroupId" ), ), bigquery.SchemaField( name="dataplexResourceName", field_type="STRING", description=( "Format: projects/:project/locations/" ":location/entryGroups/:entryGroupId" ), ), bigquery.SchemaField( name="projectId", field_type="STRING", mode="REQUIRED" ), bigquery.SchemaField( name="location", field_type="STRING", mode="REQUIRED" ), bigquery.SchemaField( name="entryGroupId", field_type="STRING", mode="REQUIRED", ), bigquery.SchemaField( name="managingSystem", field_type="STRING", mode="REQUIRED", description="Either DATA_CATALOG or DATAPLEX", ), bigquery.SchemaField( name="createdAt", field_type="DATE", mode="REQUIRED" ), ], "is_partitioned": True, "partition_column": "createdAt", "require_partition_filter": True, }, "projects": { "schema": [ bigquery.SchemaField( name="projectId", field_type="STRING", mode="REQUIRED", ), bigquery.SchemaField( name="projectNumber", field_type="INTEGER", mode="REQUIRED", ), bigquery.SchemaField( name="isDataCatalogApiEnabled", field_type="BOOLEAN", mode="REQUIRED", ), bigquery.SchemaField( name="isDataplexApiEnabled", field_type="BOOLEAN", mode="REQUIRED", ), bigquery.SchemaField( name="ancestry", field_type="RECORD", mode="REPEATED", fields=[ bigquery.SchemaField( name="type", field_type="STRING", mode="REQUIRED", ), bigquery.SchemaField( name="id", field_type="STRING", mode="REQUIRED", ), ], ), bigquery.SchemaField( name="createdAt", field_type="DATE", mode="REQUIRED" ), ], "is_partitioned": True, "partition_column": "createdAt", "require_partition_filter": True, }, } def get_table_metadata(self, table_name: str) -> Dict[str, Any]: """ Retrieves the metadata for a specified table. """ table_metadata = self.tables.get(table_name) if table_metadata: return table_metadata raise BigQuerySchemaNotFoundError( f"Schema not found for table {table_name}" )