common/materializer/generate_assets.py (165 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines utility functions to generate assets e.g. BQ tables and DAG files.
Note that functions specific to task dependent dags are defined separately in
dependent_dags.py
"""
import datetime
import json
import logging
from pathlib import Path
import string
import textwrap
import typing
from typing import Optional
import yaml
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from common.py_libs import bq_materializer
from common.py_libs import constants
from common.py_libs import dag_generator
# TODO: create unit tests.
GENERATED_BUILD_DIR_NAME = Path().cwd() / "generated_materializer_build_files"
GENERATED_DAG_DIR_NAME = Path().cwd() / "generated_materializer_dag_files"
JINJA_DATA_FILE_NAME = "bq_sql_jinja_data.json"
def get_task_dep_materializer_settings(settings_file: Path) -> Path:
"""Returns the path to the task dependent version of the settings file."""
ext = settings_file.suffix
td_file_name = Path(f"{settings_file.stem}_task_dep{ext}")
return settings_file.parent / td_file_name
def get_enabled_task_dep_settings_file(base_settings_file: Path,
config_dict: dict) -> Optional[Path]:
"""Returns the path to the task dependent settings file.
This only returns a path if it exists and task dependencies are enabled
in the config.
"""
if config_dict.get("enableTaskDependencies"):
td_settings_file = get_task_dep_materializer_settings(
base_settings_file)
if td_settings_file.exists():
return td_settings_file
def generate_dag_files(module_name: str, target_dataset_type: str,
target_dataset: str, table_name: str,
table_setting: dict, table_refresh_sql: str,
allow_telemetry: bool, location: str, template_dir: Path,
generated_dag_dir: Path) -> None:
"""Generates necessary DAG files to refresh a given table.
There are two files to be generated:
1. Python file - this is the main DAG file, and is generated using a
template.
2. BigQuery SQL file that the DAG needs to execute to refresh a table.
Naming schema:
Dag Name :
<project>_<dataset>_refresh_<table>
Dag Full Name (shown in Airflow UI):
<module>_<dataset_type>_<dag_name>
Output Directory:
<dag_dir>/<module>/<dataset_type>/<dag_name>.py
Python file:
<output_directory>/<dag_name>.py
SQL file:
<output_directory>/sql_scripts/<dag_name>.sql
e.g.
dag_dir/cm360/reporting/
project1_dataset1_refresh_clicks.py
project1_dataset1_refresh_impressions.py
sql_scripts/
project1_dataset1_refresh_clicks.sql
project1_dataset1_refresh_impressions.sql
Args:
module_name: Name of module (e.g. "cm360", "sap")
target_dataset_type: Type of dataset - e.g. "reporting" or "cdc".
target_dataset: Bigquery dataset including GCP project id.
e.g. "my_project.my_dataset".
table_name: Table name to refresh. (e.g. "CustomerMD")
table_setting: Table Settings as defined in the settings file.
table_refresh_sql: SQL with logic to populate data in the table.
allow_telemetry: Bool from Cortex config file to specify if
telemetry is allowed.
location: Location to pass to BigQueryInsertJob operators in DAGs.
template_dir: directory where python dag template is stored.
generated_dag_dir: directory where generated dag will be materialized.
"""
dag_name = "_".join(
[target_dataset.replace(".", "_"), "refresh", table_name])
dag_full_name = "_".join(
[module_name.lower(), target_dataset_type, dag_name])
# Directory to store generated files - e.g. "dag_dir/cm360/reporting/"
output_dir = Path(generated_dag_dir, module_name.lower(),
target_dataset_type)
# Generate sql file.
sql_file = Path("sql_scripts", dag_name).with_suffix(".sql")
output_sql_file = Path(output_dir, sql_file)
output_sql_file.parent.mkdir(exist_ok=True, parents=True)
with output_sql_file.open(mode="w+", encoding="utf-8") as sqlf:
sqlf.write(table_refresh_sql)
logging.info("Generated DAG SQL file : %s", output_sql_file)
# Generate python DAG file.
python_dag_template_file = Path(template_dir,
"airflow_dag_template_reporting.py")
output_py_file = Path(output_dir, dag_name).with_suffix(".py")
today = datetime.datetime.now()
load_frequency = table_setting["load_frequency"]
# TODO: Figure out a way to do lowercase in string template substitution
# directly.
py_subs = {
"dag_full_name": dag_full_name,
"lower_module_name": module_name.lower(),
"lower_tgt_dataset_type": target_dataset_type,
"query_file": str(sql_file),
"load_frequency": load_frequency,
"year": today.year,
"month": today.month,
"day": today.day,
"runtime_labels_dict": "", # A place holder for label dict string,
"bq_location": location
}
# Add bq_labels to py_subs dict if telemetry allowed
# Converts CORTEX_JOB_LABEL to str for substitution purposes
if allow_telemetry:
py_subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
if target_dataset_type == "reporting":
py_subs["tags"] = [module_name.lower(), "reporting"]
dag_generator.generate_file_from_template(python_dag_template_file,
output_py_file, **py_subs)
logging.info("Generated dag python file: %s", output_py_file)
def create_view(bq_client: bigquery.Client, view_name: str,
description: typing.Optional[str], core_sql: str) -> None:
"""Creates BQ Reporting view."""
create_view_sql = ("CREATE OR REPLACE VIEW `" + view_name + "` " +
f"OPTIONS(description=\"{description or ''}\") AS (\n" +
textwrap.indent(core_sql, " ") + "\n)")
create_view_job = bq_client.query(create_view_sql)
_ = create_view_job.result()
logging.info("Created view '%s'", view_name)
def create_table(bq_client: bigquery.Client, full_table_name: str,
description: typing.Optional[str], sql_str: str,
table_setting: dict) -> None:
"""Creates empty BQ Reporting table."""
# Steps to create table:
# a. Use core_sql to create a "temporary" table, and use it to get schema.
# b. Add partition and cluster clauses.
# c. Create final table using above two.
# NOTE: The other option is to create the table directly using
# "CREATE TABLE" DDL, but applying partition and cluster clauses is
# more complex.
# Create a temp table using table create query to get schema.
# -------------------------------------------------------------
# NOTE: We can't create BQ temp table using `create_table` API call.
# Hence, creating a regular table as temp table.
temp_table_name = full_table_name + "_temp"
bq_client.delete_table(temp_table_name, not_found_ok=True)
logging.info("Creating temporary table '%s'", temp_table_name)
temp_table_sql = ("CREATE TABLE `" + temp_table_name + "` " +
"OPTIONS(expiration_timestamp=TIMESTAMP_ADD(" +
"CURRENT_TIMESTAMP(), INTERVAL 12 HOUR))" +
" AS (\n SELECT * FROM (\n" +
textwrap.indent(sql_str, " ") + "\n)\n" +
" WHERE FALSE\n)")
logging.debug("temporary table sql = '%s'", temp_table_sql)
create_temp_table_job = bq_client.query(temp_table_sql)
_ = create_temp_table_job.result()
logging.info("Temporary table created.")
table_schema = bq_client.get_table(temp_table_name).schema
logging.debug("Table schema = \n'%s'", table_schema)
# Create final actual table.
# -------------------------
logging.info("Creating actual table '%s'", full_table_name)
table = bigquery.Table(full_table_name, schema=table_schema)
# Add partition and cluster details.
partition_details = table_setting.get("partition_details")
if partition_details:
table = bq_materializer.add_partition_to_table_def(
table, partition_details)
cluster_details = table_setting.get("cluster_details")
if cluster_details:
table = bq_materializer.add_cluster_to_table_def(table, cluster_details)
# Add optional description
if description:
table.description = description
try:
_ = bq_client.create_table(table)
logging.info("Created table '%s'", full_table_name)
except Exception as e:
raise e
finally:
# Cleanup - remove temporary table
bq_client.delete_table(temp_table_name, not_found_ok=True)
try:
bq_client.get_table(temp_table_name)
logging.warning(
"⚠️ Couldn't delete temporary table `%s`."
"Please delete it manually. ⚠️", temp_table_name)
except NotFound:
logging.info("Deleted temp table = %s'", temp_table_name)
def generate_table_refresh_sql(bq_client: bigquery.Client, full_table_name: str,
sql_str: str) -> str:
"""Returns sql for refreshing a table with results from a sql query."""
table_schema = bq_client.get_table(full_table_name).schema
table_columns = [f"`{field.name}`" for field in table_schema]
# We want to make table refresh atomic in nature. Wrapping TRUNCATE and
# INSERT within a transaction achieves that purpose. Without this, it leads
# to suboptimal customers experience when some tables miss data (albeit
# momentarily) during the table refresh dag execution.
# TODO: Indent the string below for readability. Handle output using dedent.
table_refresh_sql_text = """
BEGIN
BEGIN TRANSACTION;
TRUNCATE TABLE `${full_table_name}`;
INSERT INTO `${full_table_name}`
(
${table_columns}
)
${select_statement}
;
COMMIT TRANSACTION;
EXCEPTION WHEN ERROR THEN
ROLLBACK TRANSACTION;
RAISE USING MESSAGE = @@error.message;
END;
"""
table_refresh_sql = string.Template(table_refresh_sql_text).substitute(
full_table_name=full_table_name,
table_columns=textwrap.indent(",\n".join(table_columns), " " * 8),
select_statement=textwrap.indent(sql_str, " " * 4))
logging.debug("Table Refresh SQL = \n%s", table_refresh_sql)
return table_refresh_sql
def validate_sql(bq_client: bigquery.Client, sql: str) -> None:
"""Runs a given sql in BQ to verify if the syntax is correct."""
logging.info("Validating SQL file....")
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
try:
_ = bq_client.query(query=sql, job_config=job_config)
except Exception as e:
raise SystemExit("🛑 ERROR: Failed to parse sql.\n"
"----\n"
f"SQL = \n{sql}"
"\n----\n") from e
logging.info("SQL file is valid.")
def get_materializer_settings(materializer_settings_file: str) -> dict:
"""Parses settings file and returns settings dict after validations."""
logging.info("Loading Materializer settings file '%s'...",
materializer_settings_file)
with open(materializer_settings_file,
encoding="utf-8") as materializer_settings_fp:
materializer_settings = yaml.safe_load(materializer_settings_fp)
if materializer_settings is None:
raise ValueError(f"🛑 '{materializer_settings_file}' file is empty.")
logging.debug("Materializer settings for this module : \n%s",
json.dumps(materializer_settings, indent=4))
# Validate bq object settings.
# Since this setting file contains two separate bq table setting sections,
# we validate both of them.
independent_tables_settings = materializer_settings.get(
"bq_independent_objects")
dependent_tables_settings = materializer_settings.get(
"bq_dependent_objects")
# At least one of the two sections needs to be present.
if (independent_tables_settings is None and
dependent_tables_settings is None):
raise ValueError(
"🛑 'bq_independent_objects' and 'bq_dependent_setting' both "
"can not be empty.")
for settings in [independent_tables_settings, dependent_tables_settings]:
if settings:
bq_materializer.validate_bq_materializer_settings(settings)
return materializer_settings