in src/common/materializer/create_bq_object.py [0:0]
def main():
# Parse and validate arguments.
(module_name, jinja_data_file, target_dataset_type, target_dataset,
bq_object_setting, load_test_data, allow_telemetry, location,
skip_dag) = _parse_args()
sql_file = bq_object_setting["sql_file"]
if not Path(sql_file).is_file():
raise ValueError(f"🛑 sql_file '{sql_file}' does not exist.")
bq_client = cortex_bq_client.CortexBQClient()
# Render core sql text from sql file after applying Jinja parameters.
rendered_sql = jinja.apply_jinja_params_to_file(sql_file, jinja_data_file)
logging.debug("Rendered SQL: %s", rendered_sql)
# Validate core sql.
generate_assets.validate_sql(bq_client, rendered_sql)
object_type = bq_object_setting["type"]
object_description = bq_object_setting.get("description")
if object_type in ["table", "view"]:
object_name = Path(sql_file).stem
logging.info("Generating %s %s '%s'...", target_dataset_type,
object_type, object_name)
object_name_full = target_dataset + "." + object_name
if bq_helper.table_exists(bq_client, object_name_full):
logging.info("%s %s '%s' already exists.", target_dataset_type,
object_type, object_name)
# For non-reporting dataset types (e.g. cdc), if table or view
# exists, we don't touch it.
# NOTE: We can't generate DAG either, as for DAG generation, we
# need table to be in place.
if target_dataset_type != "reporting":
logging.info("Skipping recreating %s.", object_type)
sys.exit(0)
# For "reporting" dataset type, we always create tables and views.
# If reporting table or view exists, we need to drop it.
else:
logging.info("Dropping %s...", object_type)
bq_client.delete_table(object_name_full)
# Create view or table, based on object type.
if object_type == "view":
try:
generate_assets.create_view(bq_client, object_name_full,
object_description, rendered_sql)
except BadRequest as e:
if hasattr(e, "query_job") and e.query_job: # type: ignore
query = e.query_job.query # type: ignore
raise SystemExit(f"🛑 ERROR: Failed to create view. "
f"Error = {e}. SQL: {query}") from e
else:
raise SystemExit(f"🛑 ERROR: Failed to create view. "
f"Error = {e}.") from e
except Exception as e:
raise SystemExit(f"ERROR: Failed to create view. Error = {e}.")
else:
try:
table_setting = bq_object_setting["table_setting"]
bq_materializer.validate_table_setting(table_setting)
generate_assets.create_table(bq_client, object_name_full,
object_description, rendered_sql,
table_setting)
except BadRequest as e:
if hasattr(e, "query_job") and e.query_job: # type: ignore
query = e.query_job.query # type: ignore
raise SystemExit(f"🛑 ERROR: Failed to create table. "
f"Error = {e}. SQL: {query}") from e
else:
raise SystemExit(f"🛑 ERROR: Failed to create table. "
f"Error = {e}.") from e
except Exception as e:
raise SystemExit(
f"🛑 ERROR: Failed to create table. Error = {e}.")
table_refresh_sql = generate_assets.generate_table_refresh_sql(
bq_client, object_name_full, rendered_sql)
# If we create table, we may also need to generate DAG files unless
# they are task dependent, which are generated by the parent
# processes `generate_build_files`.
if not skip_dag:
generate_assets.generate_dag_files(
module_name, target_dataset_type, target_dataset,
object_name, table_setting, table_refresh_sql,
allow_telemetry, location, _TEMPLATE_DIR,
generate_assets.GENERATED_DAG_DIR_NAME)
# If we create table, we also need to populate it with test data
# if flag is set for that.
if load_test_data:
try:
logging.info("Populating table '%s' with test data...",
object_name_full)
query_job = bq_client.query(table_refresh_sql)
# Wait for query to finish.
_ = query_job.result()
except BadRequest as e:
if hasattr(e, "query_job") and e.query_job: # type: ignore
query = e.query_job.query # type: ignore
raise SystemExit(f"🛑 ERROR: Failed to load test data. "
f"Error = {e}. SQL: {query}") from e
else:
raise SystemExit(f"🛑 ERROR: Failed to load test data. "
f"Error = {e}.") from e
except Exception as e:
raise SystemExit(
f"🛑 ERROR: Failed to load test data. Error = {e}.")
logging.info("Generated %s %s '%s' successfully.", target_dataset_type,
object_type, object_name)
# NOTE: For "script" type of object, we do not have a way to know if
# underlying object (function or stored proc) already exists. Because of
# this limitation, for non-reporting dataset types, we can't skip the step
# of creating the object if it's already present. The script should check
# for object existence if required.
if object_type == "script":
logging.info("Executing script '%s'...", sql_file)
try:
query_job = bq_client.query(query=rendered_sql)
# Wait for query to finish.
_ = query_job.result()
except Exception as e:
raise SystemExit("🛑 ERROR: Failed to run sql.\n"
"----\n"
f"SQL = \n{rendered_sql}\n"
"----\n"
f"Error = {e}")
logging.info("Executed script '%s' successfully.", sql_file)