in libs/libcommon/src/libcommon/orchestrator.py [0:0]
def __post_init__(self) -> None:
super().__post_init__()
self.dataset = self.job_info["params"]["dataset"]
self.revision = self.job_info["params"]["revision"]
self.priority = self.job_info["priority"]
config = self.job_info["params"]["config"]
split = self.job_info["params"]["split"]
job_type = self.job_info["type"]
try:
processing_step = self.processing_graph.get_processing_step_by_job_type(job_type)
next_processing_steps = self.processing_graph.get_children(processing_step.name)
except ProcessingStepDoesNotExist as e:
raise ValueError(f"Processing step with job type: {job_type} does not exist") from e
if len(next_processing_steps) == 0:
# no next processing step, nothing to do
return
# get the dataset infos to estimate difficulty
if config is not None:
self.num_bytes = get_num_bytes_from_config_infos(dataset=self.dataset, config=config, split=split)
else:
self.num_bytes = None
# get the list of pending jobs for the children
# note that it can contain a lot of unrelated jobs, we will clean after
self.pending_jobs_df = Queue().get_pending_jobs_df(
dataset=self.dataset,
job_types=[next_processing_step.job_type for next_processing_step in next_processing_steps],
)
self.job_infos_to_create: list[JobInfo] = []
config_names: Optional[list[str]] = None
split_names: Optional[list[str]] = None
# filter to only get the jobs that are not already in the queue
for next_processing_step in next_processing_steps:
if processing_step.input_type == next_processing_step.input_type:
# same level, one job is expected
# D -> D, C -> C, S -> S
self.update(next_processing_step, config, split)
elif processing_step.input_type in ["config", "split"] and next_processing_step.input_type == "dataset":
# going to upper level (fan-in), one job is expected
# S -> D, C -> D
self.update(next_processing_step, None, None)
elif processing_step.input_type == "split" and next_processing_step.input_type == "config":
# going to upper level (fan-in), one job is expected
# S -> C
self.update(next_processing_step, config, None)
elif processing_step.input_type == "dataset" and next_processing_step.input_type == "config":
# going to lower level (fan-out), one job is expected per config, we need the list of configs
# D -> C
if config_names is None:
config_names = fetch_names(
dataset=self.dataset,
config=None,
cache_kind=DATASET_CONFIG_NAMES_KIND,
names_field="config_names",
name_field="config",
) # Note that we use the cached content even the revision is different (ie. maybe obsolete)
for config_name in config_names:
self.update(next_processing_step, config_name, None)
elif processing_step.input_type == "config" and next_processing_step.input_type == "split":
# going to lower level (fan-out), one job is expected per split, we need the list of splits
# C -> S
if split_names is None:
split_names = fetch_names(
dataset=self.dataset,
config=config,
cache_kind=CONFIG_SPLIT_NAMES_KIND,
names_field="splits",
name_field="split",
) # Note that we use the cached content even the revision is different (ie. maybe obsolete)
for split_name in split_names:
self.update(next_processing_step, config, split_name)
else:
raise NotImplementedError(
f"Unsupported input types: {processing_step.input_type} -> {next_processing_step.input_type}"
)
# we don't support fan-out dataset-level to split-level (no need for now)
# Better keep this order: delete, then create
# Note that all the waiting jobs for other revisions will be deleted
# The started jobs are ignored, for now.
if not self.pending_jobs_df.empty:
self.add_task(DeleteWaitingJobsTask(jobs_df=self.pending_jobs_df))
if self.job_infos_to_create:
self.add_task(CreateJobsTask(job_infos=self.job_infos_to_create))