common/annotations_loader.py (107 lines of code) (raw):
# Copyright 2023 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery Annotations loader
"""
import argparse
from concurrent import futures
import logging
import pathlib
import sys
import typing
import yaml
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
sys.path.append(".")
sys.path.append("./src")
sys.path.append(str(pathlib.Path(__file__).parent))
# pylint:disable=wrong-import-position
from common.py_libs.configs import load_config_file
from common.py_libs.jinja import (apply_jinja_params_dict_to_file,
initialize_jinja_from_config)
_PARALLEL_THREADS = 5
def _load_table_annotations(table_annotations: typing.Dict[str, typing.Any],
client: bigquery.Client):
full_table_id = table_annotations["id"]
table_description = table_annotations["description"]
description_changed = False
schema_changed = False
try:
table = client.get_table(full_table_id)
except NotFound:
logging.info("Table or view `%s` was not found. Skipping it.",
full_table_id)
return
table_description = table.description or table_description
description_changed = table_description != table.description
table.description = table_description
annotation_fields = {
field_item["name"]: field_item["description"]
for field_item in table_annotations["fields"]
}
schema = table.schema.copy()
for index, field in enumerate(schema):
description = field.description or annotation_fields.get(field.name, "")
if description != field.description:
schema_changed = True
field_dict = field.to_api_repr()
field_dict["description"] = description
schema[index] = bigquery.SchemaField.from_api_repr(field_dict)
changes = []
if schema_changed:
table.schema = schema
changes.append("schema")
if description_changed:
changes.append("description")
if len(changes) > 0:
client.update_table(table, changes)
logging.info("Table/view `%s` has been updated.", full_table_id)
else:
logging.info("No changes in `%s`.", full_table_id)
def load_annotations(jinja_dict: typing.Dict[str, typing.Any],
client: bigquery.Client, annotations_file: pathlib.Path):
annotations_yaml = apply_jinja_params_dict_to_file(annotations_file,
jinja_dict)
annotations_dict = yaml.safe_load(annotations_yaml)
if not annotations_dict:
logging.warning("Annotations file `%s` has no parsable content.",
str(annotations_file))
return
logging.info("Loading annotations from `%s`.", str(annotations_file))
threads = []
executor = futures.ThreadPoolExecutor(_PARALLEL_THREADS)
for table_item in annotations_dict:
threads.append(
executor.submit(_load_table_annotations, table_item, client))
futures.wait(threads)
def main(args: typing.Sequence[str]) -> int:
"""BigQuery Annotations loader main"""
parser = argparse.ArgumentParser(description="BigQuery Annotations Loader")
parser.add_argument("--annotations-directory",
help="Annotation files directory",
type=str,
required=True)
parser.add_argument("--debug",
help="Debugging mode.",
action="store_true",
default=False,
required=False)
parser.add_argument("--config",
help="Data Foundation config.json.",
type=str,
required=False,
default="./config/config.json")
options = parser.parse_args(args)
logging.basicConfig(
format="%(asctime)s | %(levelname)s | %(message)s",
level=logging.INFO if not options.debug else logging.DEBUG,
)
logging.info("Cortex Annotations Loader for BigQuery.")
config = load_config_file(options.config)
logging.info("Loading BigQuery Annotations.")
annotations_path = pathlib.Path(options.annotations_directory)
if not annotations_path.exists():
logging.critical("Directory `%s` doesn't exist.", str(annotations_path))
return 1
client = bigquery.Client(project=config["projectIdSource"],
location=config["location"])
jinja_dict = initialize_jinja_from_config(config)
for annotation_file in annotations_path.iterdir():
load_annotations(jinja_dict, client, annotation_file.absolute())
logging.info("BigQuery Annotations has been loaded!")
return 0
###############################################################
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))