in src/common/materializer/dependent_dags.py [0:0]
def create_dep_dag(self, dag_name: str,
task_dep_objs: dict[str, dag_types.BqObject]) -> Path:
"""Creates a single DAG from a map of task dependent objects.
Args:
dag_name: Name of the DAG to generate.
task_dep_objs: map of sql file names to their BQ Object settings.
This argument is mutated to add a "start_task" parent to all top
level nodes and a "stop_task" key with leaf nodes as parents.
Returns: Path to generated DAG file.
Raises:
ValueError: If a cycle is detected in the DAG.
RuntimeError: If a task dependent object is provided without
dag_setting defined.
"""
topo_sorter = graphlib.TopologicalSorter()
# Top level nodes only have the root as a parent dependency
top_level_nodes = {}
leaf_nodes = set(task_dep_objs.keys())
for sql_file, obj in task_dep_objs.items():
dag_setting = _get_dag_setting(obj)
if not dag_setting:
raise RuntimeError(
"Task dependent object must have dag_setting defined: "
f"{obj}")
# Add an inferred root parent for top level nodes.
if not dag_setting.parents:
top_level_nodes[sql_file] = obj
dag_setting.parents = ["start_task"]
leaf_nodes = leaf_nodes.difference(dag_setting.parents)
topo_sorter.add(sql_file, *dag_setting.parents)
# Add an inferred stop node
topo_sorter.add("stop_task", *leaf_nodes)
task_dep_objs["stop_task"] = dag_types.BqObject(
type=dag_types.BqObjectType.BQ_OBJECT_TYPE_UNSPECIFIED,
table_setting=dag_types.Table(dag_setting=dag_types.Dag(
name=dag_name, parents=list(leaf_nodes))))
load_freq = get_validated_load_freq(top_level_nodes)
try:
ordered_nodes = [*topo_sorter.static_order()]
except graphlib.CycleError as e:
raise ValueError(f"Cyclic dependency detected in DAG: {dag_name}. "
"DAG must not have cycles.") from e
return self._generate_dag_file(dag_name, ordered_nodes, task_dep_objs,
load_freq)