common/materializer/create_bq_object.py (203 lines of code) (raw):

# Copyright 2024 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Creates BQ object (table, view ) or executes a sql script based on the input parameters. """ # We use errors from exceptions and surface them to logs when relevant. # There is no need to raise from exceptions again. It can get confusing to # our customers during deployment errors. Disabling those linting errors. #pylint: disable=raise-missing-from #TODO: Remove these. Only here to pass presubmit for intial changes. #pylint: disable=redundant-returns-doc #pylint: disable=unused-import import argparse import json import logging from pathlib import Path import sys from google.cloud.exceptions import BadRequest from common.materializer import generate_assets from common.py_libs import bq_helper from common.py_libs import bq_materializer from common.py_libs import cortex_bq_client from common.py_libs import jinja # NOTE: All paths here are relative to the root directory, unless specified # otherwise. # Directory where this file resides. _THIS_DIR = Path(__file__).resolve().parent # Directory containing various template files. _TEMPLATE_DIR = Path(_THIS_DIR, "templates") def _parse_args() -> tuple[str, str, str, str, dict, bool, bool, str, bool]: """Parses, validates and returns arguments, sets up logging.""" parser = argparse.ArgumentParser() parser.add_argument( "--module_name", type=str, required=True, help="Module for which to generate BQ table/view. Required.") parser.add_argument( "--jinja_data_file", type=str, required=True, help=("Jinja data file containing replacement values for the sql_file " "file. settings file, with relative path. Required.")) parser.add_argument( "--target_dataset_type", type=str, required=False, default="Reporting", help=("Type of dataset (CDC/Reporting) for which this table or view " " is created. Default value is 'Reporting'.")) parser.add_argument( "--target_dataset", type=str, required=True, help=("Full name of BiQuery dataset in which this table or view will " "be created. Required.")) parser.add_argument( "--bq_object_setting", type=str, required=True, help=("BQ Object Setting dictionary - containing value corresponding " "to the entry in the materializer settings file for the given " "table. Required.")) parser.add_argument( "--load_test_data", default=False, action="store_true", help="Flag to indicate if test data should be loaded in the tables.") parser.add_argument( "--debug", default=False, action="store_true", help="Flag to set log level to DEBUG. Default is WARNING") parser.add_argument("--allow_telemetry", default=False, action="store_true", help="Flag to indicate if telemetry is allowed.") parser.add_argument( "--location", type=str, required=True, help="Location to pass to BigQueryInsertJob operators in DAGs.") parser.add_argument( "--skip_dag", default=False, action="store_true", help="Flag to indicate if Composer DAG should not be generated.") args = parser.parse_args() enable_debug = args.debug logging.basicConfig(level=logging.DEBUG if enable_debug else logging.INFO) module_name = args.module_name jinja_data_file = args.jinja_data_file target_dataset_type = args.target_dataset_type.lower() target_dataset = args.target_dataset bq_object_setting_str = args.bq_object_setting load_test_data = args.load_test_data allow_telemetry = args.allow_telemetry location = args.location skip_dag = args.skip_dag logging.info("Arguments:") logging.info(" module_name = %s", module_name) logging.info(" jinja_data_file = %s", jinja_data_file) logging.info(" target_dataset_type = %s", target_dataset_type) logging.info(" target_dataset = %s", target_dataset) logging.info(" bq_object_setting_str = %s", bq_object_setting_str) logging.info(" load_test_data = %s", load_test_data) logging.info(" debug = %s", enable_debug) logging.info(" allow_telemetry = %s", allow_telemetry) logging.info(" location = %s", location) logging.info(" skip_dag = %s", skip_dag) if not Path(jinja_data_file).is_file(): raise ValueError( f"🛑 jinja_data_file '{jinja_data_file}' does not exist.") try: bq_object_setting = json.loads(bq_object_setting_str) except Exception as e: raise ValueError(f"🛑 Failed to read table settings. Error = {e}.") return (module_name, jinja_data_file, target_dataset_type, target_dataset, bq_object_setting, load_test_data, allow_telemetry, location, skip_dag) def main(): # Parse and validate arguments. (module_name, jinja_data_file, target_dataset_type, target_dataset, bq_object_setting, load_test_data, allow_telemetry, location, skip_dag) = _parse_args() sql_file = bq_object_setting["sql_file"] if not Path(sql_file).is_file(): raise ValueError(f"🛑 sql_file '{sql_file}' does not exist.") bq_client = cortex_bq_client.CortexBQClient() # Render core sql text from sql file after applying Jinja parameters. rendered_sql = jinja.apply_jinja_params_to_file(sql_file, jinja_data_file) logging.debug("Rendered SQL: %s", rendered_sql) # Validate core sql. generate_assets.validate_sql(bq_client, rendered_sql) object_type = bq_object_setting["type"] object_description = bq_object_setting.get("description") if object_type in ["table", "view"]: object_name = Path(sql_file).stem logging.info("Generating %s %s '%s'...", target_dataset_type, object_type, object_name) object_name_full = target_dataset + "." + object_name if bq_helper.table_exists(bq_client, object_name_full): logging.info("%s %s '%s' already exists.", target_dataset_type, object_type, object_name) # For non-reporting dataset types (e.g. cdc), if table or view # exists, we don't touch it. # NOTE: We can't generate DAG either, as for DAG generation, we # need table to be in place. if target_dataset_type != "reporting": logging.info("Skipping recreating %s.", object_type) sys.exit(0) # For "reporting" dataset type, we always create tables and views. # If reporting table or view exists, we need to drop it. else: logging.info("Dropping %s...", object_type) bq_client.delete_table(object_name_full) # Create view or table, based on object type. if object_type == "view": try: generate_assets.create_view(bq_client, object_name_full, object_description, rendered_sql) except BadRequest as e: if hasattr(e, "query_job") and e.query_job: # type: ignore query = e.query_job.query # type: ignore raise SystemExit(f"🛑 ERROR: Failed to create view. " f"Error = {e}. SQL: {query}") from e else: raise SystemExit(f"🛑 ERROR: Failed to create view. " f"Error = {e}.") from e except Exception as e: raise SystemExit(f"ERROR: Failed to create view. Error = {e}.") else: try: table_setting = bq_object_setting["table_setting"] bq_materializer.validate_table_setting(table_setting) generate_assets.create_table(bq_client, object_name_full, object_description, rendered_sql, table_setting) except BadRequest as e: if hasattr(e, "query_job") and e.query_job: # type: ignore query = e.query_job.query # type: ignore raise SystemExit(f"🛑 ERROR: Failed to create table. " f"Error = {e}. SQL: {query}") from e else: raise SystemExit(f"🛑 ERROR: Failed to create table. " f"Error = {e}.") from e except Exception as e: raise SystemExit( f"🛑 ERROR: Failed to create table. Error = {e}.") table_refresh_sql = generate_assets.generate_table_refresh_sql( bq_client, object_name_full, rendered_sql) # If we create table, we may also need to generate DAG files unless # they are task dependent, which are generated by the parent # processes `generate_build_files`. if not skip_dag: generate_assets.generate_dag_files( module_name, target_dataset_type, target_dataset, object_name, table_setting, table_refresh_sql, allow_telemetry, location, _TEMPLATE_DIR, generate_assets.GENERATED_DAG_DIR_NAME) # If we create table, we also need to populate it with test data # if flag is set for that. if load_test_data: try: logging.info("Populating table '%s' with test data...", object_name_full) query_job = bq_client.query(table_refresh_sql) # Wait for query to finish. _ = query_job.result() except BadRequest as e: if hasattr(e, "query_job") and e.query_job: # type: ignore query = e.query_job.query # type: ignore raise SystemExit(f"🛑 ERROR: Failed to load test data. " f"Error = {e}. SQL: {query}") from e else: raise SystemExit(f"🛑 ERROR: Failed to load test data. " f"Error = {e}.") from e except Exception as e: raise SystemExit( f"🛑 ERROR: Failed to load test data. Error = {e}.") logging.info("Generated %s %s '%s' successfully.", target_dataset_type, object_type, object_name) # NOTE: For "script" type of object, we do not have a way to know if # underlying object (function or stored proc) already exists. Because of # this limitation, for non-reporting dataset types, we can't skip the step # of creating the object if it's already present. The script should check # for object existence if required. if object_type == "script": logging.info("Executing script '%s'...", sql_file) try: query_job = bq_client.query(query=rendered_sql) # Wait for query to finish. _ = query_job.result() except Exception as e: raise SystemExit("🛑 ERROR: Failed to run sql.\n" "----\n" f"SQL = \n{rendered_sql}\n" "----\n" f"Error = {e}") logging.info("Executed script '%s' successfully.", sql_file) if __name__ == "__main__": main()