in common/materializer/dependent_dags.py [0:0]
def get_validated_load_freq(
top_level_objs: dict[str, dag_types.BqObject]) -> str:
"""Return validated load_frequency defined by top level nodes.
Top level nodes must have a consistent schedule defined. Multiple nodes
can define the same load_frequence, or nodes can leave it unset as long as
on of the top level nodes has it set.
Args:
top_level_objs: map of top level BQ objects that are direct children of
the DAG root keyed by sql file name.
Returns: Validated load_frequency string defining DAG schedule.
Raises:
ValueError: if the top level nodes don't have any defined load_frequency
or if they conflict.
"""
schedule = None
for sql_file, obj in top_level_objs.items():
assert obj.table_setting is not None
if obj.table_setting.load_frequency is None:
continue
load_frequency = obj.table_setting.load_frequency
if schedule is None:
schedule = load_frequency
continue
if load_frequency != schedule:
raise ValueError(
"Top level node deteced with conflicting load_frequency. "
f"{sql_file} set to '{load_frequency}' but expected "
f"'{schedule}'. Please resolve any load_frequency conflicts "
f"among top level nodes: {list(top_level_objs.keys())}")
if schedule is None:
raise ValueError(
"At least one top level node must define load_frequency. Top level "
f"nodes include: {list(top_level_objs.keys())}")
return schedule