in common/materializer/generate_assets.py [0:0]
def generate_dag_files(module_name: str, target_dataset_type: str,
target_dataset: str, table_name: str,
table_setting: dict, table_refresh_sql: str,
allow_telemetry: bool, location: str, template_dir: Path,
generated_dag_dir: Path) -> None:
"""Generates necessary DAG files to refresh a given table.
There are two files to be generated:
1. Python file - this is the main DAG file, and is generated using a
template.
2. BigQuery SQL file that the DAG needs to execute to refresh a table.
Naming schema:
Dag Name :
<project>_<dataset>_refresh_<table>
Dag Full Name (shown in Airflow UI):
<module>_<dataset_type>_<dag_name>
Output Directory:
<dag_dir>/<module>/<dataset_type>/<dag_name>.py
Python file:
<output_directory>/<dag_name>.py
SQL file:
<output_directory>/sql_scripts/<dag_name>.sql
e.g.
dag_dir/cm360/reporting/
project1_dataset1_refresh_clicks.py
project1_dataset1_refresh_impressions.py
sql_scripts/
project1_dataset1_refresh_clicks.sql
project1_dataset1_refresh_impressions.sql
Args:
module_name: Name of module (e.g. "cm360", "sap")
target_dataset_type: Type of dataset - e.g. "reporting" or "cdc".
target_dataset: Bigquery dataset including GCP project id.
e.g. "my_project.my_dataset".
table_name: Table name to refresh. (e.g. "CustomerMD")
table_setting: Table Settings as defined in the settings file.
table_refresh_sql: SQL with logic to populate data in the table.
allow_telemetry: Bool from Cortex config file to specify if
telemetry is allowed.
location: Location to pass to BigQueryInsertJob operators in DAGs.
template_dir: directory where python dag template is stored.
generated_dag_dir: directory where generated dag will be materialized.
"""
dag_name = "_".join(
[target_dataset.replace(".", "_"), "refresh", table_name])
dag_full_name = "_".join(
[module_name.lower(), target_dataset_type, dag_name])
# Directory to store generated files - e.g. "dag_dir/cm360/reporting/"
output_dir = Path(generated_dag_dir, module_name.lower(),
target_dataset_type)
# Generate sql file.
sql_file = Path("sql_scripts", dag_name).with_suffix(".sql")
output_sql_file = Path(output_dir, sql_file)
output_sql_file.parent.mkdir(exist_ok=True, parents=True)
with output_sql_file.open(mode="w+", encoding="utf-8") as sqlf:
sqlf.write(table_refresh_sql)
logging.info("Generated DAG SQL file : %s", output_sql_file)
# Generate python DAG file.
python_dag_template_file = Path(template_dir,
"airflow_dag_template_reporting.py")
output_py_file = Path(output_dir, dag_name).with_suffix(".py")
today = datetime.datetime.now()
load_frequency = table_setting["load_frequency"]
# TODO: Figure out a way to do lowercase in string template substitution
# directly.
py_subs = {
"dag_full_name": dag_full_name,
"lower_module_name": module_name.lower(),
"lower_tgt_dataset_type": target_dataset_type,
"query_file": str(sql_file),
"load_frequency": load_frequency,
"year": today.year,
"month": today.month,
"day": today.day,
"runtime_labels_dict": "", # A place holder for label dict string,
"bq_location": location
}
# Add bq_labels to py_subs dict if telemetry allowed
# Converts CORTEX_JOB_LABEL to str for substitution purposes
if allow_telemetry:
py_subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
if target_dataset_type == "reporting":
py_subs["tags"] = [module_name.lower(), "reporting"]
dag_generator.generate_file_from_template(python_dag_template_file,
output_py_file, **py_subs)
logging.info("Generated dag python file: %s", output_py_file)