def __post_init__()

in libs/libcommon/src/libcommon/orchestrator.py [0:0]


    def __post_init__(self) -> None:
        super().__post_init__()
        self.dataset = self.job_info["params"]["dataset"]
        self.revision = self.job_info["params"]["revision"]
        self.priority = self.job_info["priority"]

        config = self.job_info["params"]["config"]
        split = self.job_info["params"]["split"]
        job_type = self.job_info["type"]
        try:
            processing_step = self.processing_graph.get_processing_step_by_job_type(job_type)
            next_processing_steps = self.processing_graph.get_children(processing_step.name)
        except ProcessingStepDoesNotExist as e:
            raise ValueError(f"Processing step with job type: {job_type} does not exist") from e

        if len(next_processing_steps) == 0:
            # no next processing step, nothing to do
            return

        # get the dataset infos to estimate difficulty
        if config is not None:
            self.num_bytes = get_num_bytes_from_config_infos(dataset=self.dataset, config=config, split=split)
        else:
            self.num_bytes = None

        # get the list of pending jobs for the children
        # note that it can contain a lot of unrelated jobs, we will clean after
        self.pending_jobs_df = Queue().get_pending_jobs_df(
            dataset=self.dataset,
            job_types=[next_processing_step.job_type for next_processing_step in next_processing_steps],
        )

        self.job_infos_to_create: list[JobInfo] = []
        config_names: Optional[list[str]] = None
        split_names: Optional[list[str]] = None

        # filter to only get the jobs that are not already in the queue
        for next_processing_step in next_processing_steps:
            if processing_step.input_type == next_processing_step.input_type:
                # same level, one job is expected
                # D -> D, C -> C, S -> S
                self.update(next_processing_step, config, split)
            elif processing_step.input_type in ["config", "split"] and next_processing_step.input_type == "dataset":
                # going to upper level (fan-in), one job is expected
                # S -> D, C -> D
                self.update(next_processing_step, None, None)
            elif processing_step.input_type == "split" and next_processing_step.input_type == "config":
                # going to upper level (fan-in), one job is expected
                # S -> C
                self.update(next_processing_step, config, None)
            elif processing_step.input_type == "dataset" and next_processing_step.input_type == "config":
                # going to lower level (fan-out), one job is expected per config, we need the list of configs
                # D -> C
                if config_names is None:
                    config_names = fetch_names(
                        dataset=self.dataset,
                        config=None,
                        cache_kind=DATASET_CONFIG_NAMES_KIND,
                        names_field="config_names",
                        name_field="config",
                    )  # Note that we use the cached content even the revision is different (ie. maybe obsolete)
                for config_name in config_names:
                    self.update(next_processing_step, config_name, None)
            elif processing_step.input_type == "config" and next_processing_step.input_type == "split":
                # going to lower level (fan-out), one job is expected per split, we need the list of splits
                # C -> S
                if split_names is None:
                    split_names = fetch_names(
                        dataset=self.dataset,
                        config=config,
                        cache_kind=CONFIG_SPLIT_NAMES_KIND,
                        names_field="splits",
                        name_field="split",
                    )  # Note that we use the cached content even the revision is different (ie. maybe obsolete)
                for split_name in split_names:
                    self.update(next_processing_step, config, split_name)
            else:
                raise NotImplementedError(
                    f"Unsupported input types: {processing_step.input_type} -> {next_processing_step.input_type}"
                )
                # we don't support fan-out dataset-level to split-level (no need for now)

        # Better keep this order: delete, then create
        # Note that all the waiting jobs for other revisions will be deleted
        # The started jobs are ignored, for now.
        if not self.pending_jobs_df.empty:
            self.add_task(DeleteWaitingJobsTask(jobs_df=self.pending_jobs_df))
        if self.job_infos_to_create:
            self.add_task(CreateJobsTask(job_infos=self.job_infos_to_create))