def main()

in src/k9/src/meridian/src/common/materializer/create_bq_object.py [0:0]


def main():

    # Parse and validate arguments.
    (module_name, jinja_data_file, target_dataset_type, target_dataset,
     bq_object_setting, load_test_data, allow_telemetry, location,
     skip_dag) = _parse_args()

    sql_file = bq_object_setting["sql_file"]
    if not Path(sql_file).is_file():
        raise ValueError(f"🛑 sql_file '{sql_file}' does not exist.")

    bq_client = cortex_bq_client.CortexBQClient()

    # Render core sql text from sql file after applying Jinja parameters.
    rendered_sql = jinja.apply_jinja_params_to_file(sql_file, jinja_data_file)
    logging.debug("Rendered SQL: %s", rendered_sql)
    # Validate core sql.
    generate_assets.validate_sql(bq_client, rendered_sql)

    object_type = bq_object_setting["type"]
    object_description = bq_object_setting.get("description")

    if object_type in ["table", "view"]:
        object_name = Path(sql_file).stem
        logging.info("Generating %s %s '%s'...", target_dataset_type,
                     object_type, object_name)
        object_name_full = target_dataset + "." + object_name

        if bq_helper.table_exists(bq_client, object_name_full):
            logging.info("%s %s '%s' already exists.", target_dataset_type,
                         object_type, object_name)
            # For non-reporting dataset types (e.g. cdc), if table or view
            # exists, we don't touch it.
            # NOTE: We can't generate DAG either, as for DAG generation, we
            # need table to be in place.
            if target_dataset_type != "reporting":
                logging.info("Skipping recreating %s.", object_type)
                sys.exit(0)
            # For "reporting" dataset type, we always create tables and views.
            # If reporting table or view exists, we need to drop it.
            else:
                logging.info("Dropping %s...", object_type)
                bq_client.delete_table(object_name_full)

        # Create view or table, based on object type.
        if object_type == "view":
            try:
                generate_assets.create_view(bq_client, object_name_full,
                                            object_description, rendered_sql)
            except BadRequest as e:
                if hasattr(e, "query_job") and e.query_job:  # type: ignore
                    query = e.query_job.query  # type: ignore
                    raise SystemExit(f"🛑 ERROR: Failed to create view. "
                                     f"Error = {e}. SQL: {query}") from e
                else:
                    raise SystemExit(f"🛑 ERROR: Failed to create view. "
                                     f"Error = {e}.") from e
            except Exception as e:
                raise SystemExit(f"ERROR: Failed to create view. Error = {e}.")
        else:
            try:
                table_setting = bq_object_setting["table_setting"]
                bq_materializer.validate_table_setting(table_setting)
                generate_assets.create_table(bq_client, object_name_full,
                                             object_description, rendered_sql,
                                             table_setting)
            except BadRequest as e:
                if hasattr(e, "query_job") and e.query_job:  # type: ignore
                    query = e.query_job.query  # type: ignore
                    raise SystemExit(f"🛑 ERROR: Failed to create table. "
                                     f"Error = {e}. SQL: {query}") from e
                else:
                    raise SystemExit(f"🛑 ERROR: Failed to create table. "
                                     f"Error = {e}.") from e
            except Exception as e:
                raise SystemExit(
                    f"🛑 ERROR: Failed to create table. Error = {e}.")

            table_refresh_sql = generate_assets.generate_table_refresh_sql(
                bq_client, object_name_full, rendered_sql)

            # If we create table, we may also need to generate DAG files unless
            # they are task dependent, which are generated by the parent
            # processes `generate_build_files`.
            if not skip_dag:
                generate_assets.generate_dag_files(
                    module_name, target_dataset_type, target_dataset,
                    object_name, table_setting, table_refresh_sql,
                    allow_telemetry, location, _TEMPLATE_DIR,
                    generate_assets.GENERATED_DAG_DIR_NAME)

            # If we create table, we also need to populate it with test data
            # if flag is set for that.
            if load_test_data:
                try:
                    logging.info("Populating table '%s' with test data...",
                                 object_name_full)
                    query_job = bq_client.query(table_refresh_sql)
                    # Wait for query to finish.
                    _ = query_job.result()
                except BadRequest as e:
                    if hasattr(e, "query_job") and e.query_job:  # type: ignore
                        query = e.query_job.query  # type: ignore
                        raise SystemExit(f"🛑 ERROR: Failed to load test data. "
                                         f"Error = {e}. SQL: {query}") from e
                    else:
                        raise SystemExit(f"🛑 ERROR: Failed to load test data. "
                                         f"Error = {e}.") from e
                except Exception as e:
                    raise SystemExit(
                        f"🛑 ERROR: Failed to load test data. Error = {e}.")

        logging.info("Generated %s %s '%s' successfully.", target_dataset_type,
                     object_type, object_name)

    # NOTE: For "script" type of object, we do not have a way to know if
    # underlying object (function or stored proc) already exists. Because of
    # this limitation, for non-reporting dataset types, we can't skip the step
    # of creating the object if it's already present. The script should check
    # for object existence if required.
    if object_type == "script":
        logging.info("Executing script '%s'...", sql_file)
        try:
            query_job = bq_client.query(query=rendered_sql)
            # Wait for query to finish.
            _ = query_job.result()
        except Exception as e:
            raise SystemExit("🛑 ERROR: Failed to run sql.\n"
                             "----\n"
                             f"SQL = \n{rendered_sql}\n"
                             "----\n"
                             f"Error = {e}")
        logging.info("Executed script '%s' successfully.", sql_file)