def _create_plan()

in libs/libcommon/src/libcommon/orchestrator.py [0:0]


    def _create_plan(self) -> None:
        pending_jobs_to_delete_df = self.pending_jobs_df.copy()
        job_infos_to_create: list[JobInfo] = []
        artifact_states = (
            list(self.cache_status.cache_is_empty.values())
            + list(self.cache_status.cache_is_error_to_retry.values())
            + list(self.cache_status.cache_is_outdated_by_parent.values())
            + list(self.cache_status.cache_is_job_runner_obsolete.values())
            + list(self.cache_status.cache_has_different_git_revision.values())
        )

        @lru_cache
        def is_big(config: str) -> bool:
            num_bytes = get_num_bytes_from_config_infos(dataset=self.dataset, config=config)
            if num_bytes is None:
                return False
            else:
                return num_bytes > self.processing_graph.min_bytes_for_bonus_difficulty

        for artifact_state in artifact_states:
            valid_pending_jobs_df = artifact_state.job_state.valid_pending_jobs_df
            if valid_pending_jobs_df.empty:
                difficulty = artifact_state.processing_step.difficulty
                if isinstance(artifact_state.config, str) and is_big(config=artifact_state.config):
                    difficulty += artifact_state.processing_step.bonus_difficulty_if_dataset_is_big
                if artifact_state.cache_state.cache_entry_metadata is not None:
                    failed_runs = artifact_state.cache_state.cache_entry_metadata["failed_runs"]
                else:
                    failed_runs = 0
                # increase difficulty according to number of failed runs
                difficulty = min(DEFAULT_DIFFICULTY_MAX, difficulty + failed_runs * DIFFICULTY_BONUS_BY_FAILED_RUNS)
                job_infos_to_create.append(
                    {
                        "job_id": "not used",
                        "type": artifact_state.processing_step.job_type,
                        "params": {
                            "dataset": self.dataset,
                            "revision": self.revision,
                            "config": artifact_state.config,
                            "split": artifact_state.split,
                        },
                        "priority": self.priority,
                        "difficulty": difficulty,
                        "started_at": None,
                    }
                )
            else:
                pending_jobs_to_delete_df.drop(valid_pending_jobs_df.index, inplace=True)
        # Better keep this order: delete, then create
        # Note that all the waiting jobs for other revisions will be deleted
        # The started jobs are ignored, for now.
        if not pending_jobs_to_delete_df.empty:
            self.add_task(DeleteWaitingJobsTask(jobs_df=pending_jobs_to_delete_df))
        if job_infos_to_create:
            self.add_task(CreateJobsTask(job_infos=job_infos_to_create))