in libs/libcommon/src/libcommon/processing_graph.py [0:0]
def __post_init__(self) -> None:
_nx_graph = nx.DiGraph()
_processing_steps: dict[str, ProcessingStep] = {}
_processing_step_names_by_input_type: dict[InputType, list[str]] = {
"dataset": [],
"config": [],
"split": [],
}
for name, specification in self.specification.items():
# check that the step is consistent with its specification
input_type = guard_input_type(specification.get("input_type", DEFAULT_INPUT_TYPE))
if (
_nx_graph.has_node(name)
or name in _processing_steps
or name in _processing_step_names_by_input_type[input_type]
):
raise ValueError(f"Processing step {name} is defined twice.")
_nx_graph.add_node(name)
_processing_steps[name] = ProcessingStep(
name=name,
input_type=input_type,
job_runner_version=specification.get("job_runner_version", DEFAULT_JOB_RUNNER_VERSION),
difficulty=specification.get("difficulty", DEFAULT_DIFFICULTY),
bonus_difficulty_if_dataset_is_big=specification.get("bonus_difficulty_if_dataset_is_big", 0),
)
if _processing_steps[name].bonus_difficulty_if_dataset_is_big and input_type == "dataset":
raise ValueError(
f"Processing step {name} has bonus_difficulty_if_dataset_is_big but "
"this field is not supported for dataset-level steps."
)
_processing_step_names_by_input_type[input_type].append(name)
for name, specification in self.specification.items():
triggered_by = get_triggered_by_as_list(specification.get("triggered_by"))
for processing_step_name in triggered_by:
if not _nx_graph.has_node(processing_step_name):
raise ValueError(
f"Processing step {name} is triggered by {processing_step_name} but {processing_step_name} is"
" not defined."
)
_nx_graph.add_edge(processing_step_name, name)
if not nx.is_directed_acyclic_graph(_nx_graph):
raise ValueError("The graph is not a directed acyclic graph.")
self._nx_graph = _nx_graph
self._processing_steps = _processing_steps
self._processing_step_names_by_input_type = _processing_step_names_by_input_type
self._first_processing_steps = [
self._processing_steps[processing_step_name]
for processing_step_name, degree in _nx_graph.in_degree()
if degree == 0
]
if any(processing_step.input_type != "dataset" for processing_step in self._first_processing_steps):
raise ValueError("The first processing steps must be dataset-level. The graph state is incoherent.")
self._topologically_ordered_processing_steps = [
self.get_processing_step(processing_step_name) for processing_step_name in nx.topological_sort(_nx_graph)
]
self._alphabetically_ordered_processing_steps = [
self.get_processing_step(processing_step_name) for processing_step_name in sorted(_nx_graph.nodes())
]
if self.check_one_of_parents_is_same_or_higher_level:
check_one_of_parents_is_same_or_higher_level(self)