in libs/libcommon/src/libcommon/orchestrator.py [0:0]
def _create_plan(self) -> None:
pending_jobs_to_delete_df = self.pending_jobs_df.copy()
job_infos_to_create: list[JobInfo] = []
artifact_states = (
list(self.cache_status.cache_is_empty.values())
+ list(self.cache_status.cache_is_error_to_retry.values())
+ list(self.cache_status.cache_is_outdated_by_parent.values())
+ list(self.cache_status.cache_is_job_runner_obsolete.values())
+ list(self.cache_status.cache_has_different_git_revision.values())
)
@lru_cache
def is_big(config: str) -> bool:
num_bytes = get_num_bytes_from_config_infos(dataset=self.dataset, config=config)
if num_bytes is None:
return False
else:
return num_bytes > self.processing_graph.min_bytes_for_bonus_difficulty
for artifact_state in artifact_states:
valid_pending_jobs_df = artifact_state.job_state.valid_pending_jobs_df
if valid_pending_jobs_df.empty:
difficulty = artifact_state.processing_step.difficulty
if isinstance(artifact_state.config, str) and is_big(config=artifact_state.config):
difficulty += artifact_state.processing_step.bonus_difficulty_if_dataset_is_big
if artifact_state.cache_state.cache_entry_metadata is not None:
failed_runs = artifact_state.cache_state.cache_entry_metadata["failed_runs"]
else:
failed_runs = 0
# increase difficulty according to number of failed runs
difficulty = min(DEFAULT_DIFFICULTY_MAX, difficulty + failed_runs * DIFFICULTY_BONUS_BY_FAILED_RUNS)
job_infos_to_create.append(
{
"job_id": "not used",
"type": artifact_state.processing_step.job_type,
"params": {
"dataset": self.dataset,
"revision": self.revision,
"config": artifact_state.config,
"split": artifact_state.split,
},
"priority": self.priority,
"difficulty": difficulty,
"started_at": None,
}
)
else:
pending_jobs_to_delete_df.drop(valid_pending_jobs_df.index, inplace=True)
# Better keep this order: delete, then create
# Note that all the waiting jobs for other revisions will be deleted
# The started jobs are ignored, for now.
if not pending_jobs_to_delete_df.empty:
self.add_task(DeleteWaitingJobsTask(jobs_df=pending_jobs_to_delete_df))
if job_infos_to_create:
self.add_task(CreateJobsTask(job_infos=job_infos_to_create))