in common/materializer/dependent_dags.py [0:0]
def _generate_dag_file(self, dag_name: str, ordered_nodes: list[str],
task_dep_objs: dict[str, dag_types.BqObject],
load_frequency: str) -> Path:
logging.info("Generating DAG file for: %s", dag_name)
parent_dir = Path(__file__).resolve().parent
template_dir = parent_dir / "templates"
header_template = (template_dir /
"airflow_task_dep_dag_template_reporting.py")
bq_op_template = template_dir / "bq_insert_job_template.txt"
dag_full_name = "_".join(
[self.target_dataset_name.replace(".", "_"), dag_name])
today = datetime.datetime.now()
# General template substitutions.
subs = {
"dag_full_name": dag_full_name,
"module_name": self.module_name,
"tgt_dataset_type": self.target_dataset_type,
"load_frequency": load_frequency,
"year": today.year,
"month": today.month,
"day": today.day,
"runtime_labels_dict": "", # A place holder for label dict string,
"bq_location": self.location
}
if self.allow_telemetry:
subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
if self.target_dataset_type == "reporting":
subs["tags"] = [self.module_name, self.target_dataset_type]
# Create DAG header.
dag_header = dag_generator.generate_str_from_template(
header_template, **subs)
# Create BQ ops and edges.
bq_op_strs = []
edge_strs = []
for sql_file in ordered_nodes:
# Start tasks don't have parent edges or BQ operators.
if sql_file == "start_task":
continue
table_name = Path(sql_file).stem
if sql_file == "stop_task":
task_id = sql_file
else:
task_id = f"refresh_{table_name}"
# Generate edges.
obj = task_dep_objs[sql_file]
parents = _get_parent_task_ids(
obj.table_setting.dag_setting.parents) # type: ignore
assert parents is not None
if len(parents) > 1:
parents_str = ", ".join(sorted(parents))
edge_str = f"[{parents_str}] >> {task_id}"
else:
edge_str = f"{parents[0]} >> {task_id}"
edge_str = textwrap.indent(edge_str, " " * 4)
edge_strs.append(edge_str)
# Don't generate BQ operator for stop tasks .
if sql_file == "stop_task":
continue
generated_sql_file = self._generate_sql_file(sql_file, dag_name)
# SQL File specific substitutions.
subs["table_name"] = table_name
subs["query_file"] = generated_sql_file
# Generate BQ operators.
bq_op = dag_generator.generate_str_from_template(
bq_op_template, **subs)
bq_op = textwrap.indent(bq_op, " " * 4)
bq_op_strs.append(bq_op)
generated_dag = "\n\n".join([dag_header, *bq_op_strs, *edge_strs])
generated_dag_file = (self.output_dir / dag_name /
dag_full_name).with_suffix(".py")
generated_dag_file.parent.mkdir(parents=True, exist_ok=True)
generated_dag_file.write_text(generated_dag)
logging.info("Generated DAG py file : %s", generated_dag_file)
return generated_dag_file