def _generate_dag_file()

in common/materializer/dependent_dags.py [0:0]


    def _generate_dag_file(self, dag_name: str, ordered_nodes: list[str],
                           task_dep_objs: dict[str, dag_types.BqObject],
                           load_frequency: str) -> Path:
        logging.info("Generating DAG file for: %s", dag_name)

        parent_dir = Path(__file__).resolve().parent
        template_dir = parent_dir / "templates"
        header_template = (template_dir /
                           "airflow_task_dep_dag_template_reporting.py")
        bq_op_template = template_dir / "bq_insert_job_template.txt"
        dag_full_name = "_".join(
            [self.target_dataset_name.replace(".", "_"), dag_name])
        today = datetime.datetime.now()

        # General template substitutions.
        subs = {
            "dag_full_name": dag_full_name,
            "module_name": self.module_name,
            "tgt_dataset_type": self.target_dataset_type,
            "load_frequency": load_frequency,
            "year": today.year,
            "month": today.month,
            "day": today.day,
            "runtime_labels_dict": "",  # A place holder for label dict string,
            "bq_location": self.location
        }
        if self.allow_telemetry:
            subs["runtime_labels_dict"] = str(constants.CORTEX_JOB_LABEL)
        if self.target_dataset_type == "reporting":
            subs["tags"] = [self.module_name, self.target_dataset_type]

        # Create DAG header.
        dag_header = dag_generator.generate_str_from_template(
            header_template, **subs)

        # Create BQ ops and edges.
        bq_op_strs = []
        edge_strs = []
        for sql_file in ordered_nodes:
            # Start tasks don't have parent edges or BQ operators.
            if sql_file == "start_task":
                continue

            table_name = Path(sql_file).stem
            if sql_file == "stop_task":
                task_id = sql_file
            else:
                task_id = f"refresh_{table_name}"

            # Generate edges.
            obj = task_dep_objs[sql_file]
            parents = _get_parent_task_ids(
                obj.table_setting.dag_setting.parents)  # type: ignore
            assert parents is not None
            if len(parents) > 1:
                parents_str = ", ".join(sorted(parents))
                edge_str = f"[{parents_str}] >> {task_id}"
            else:
                edge_str = f"{parents[0]} >> {task_id}"

            edge_str = textwrap.indent(edge_str, " " * 4)
            edge_strs.append(edge_str)

            # Don't generate BQ operator for stop tasks .
            if sql_file == "stop_task":
                continue

            generated_sql_file = self._generate_sql_file(sql_file, dag_name)

            # SQL File specific substitutions.
            subs["table_name"] = table_name
            subs["query_file"] = generated_sql_file

            # Generate BQ operators.
            bq_op = dag_generator.generate_str_from_template(
                bq_op_template, **subs)
            bq_op = textwrap.indent(bq_op, " " * 4)
            bq_op_strs.append(bq_op)

        generated_dag = "\n\n".join([dag_header, *bq_op_strs, *edge_strs])
        generated_dag_file = (self.output_dir / dag_name /
                              dag_full_name).with_suffix(".py")
        generated_dag_file.parent.mkdir(parents=True, exist_ok=True)
        generated_dag_file.write_text(generated_dag)

        logging.info("Generated DAG py file : %s", generated_dag_file)
        return generated_dag_file