in libs/libcommon/src/libcommon/state.py [0:0]
def __post_init__(self, pending_jobs_df: pd.DataFrame, cache_entries_df: pd.DataFrame) -> None:
with StepProfiler(
method="ConfigState.__post_init__",
step="get_config_level_artifact_states",
):
self.artifact_state_by_step = {
processing_step.name: ArtifactState(
processing_step=processing_step,
dataset=self.dataset,
revision=self.revision,
config=self.config,
split=None,
pending_jobs_df=pending_jobs_df[
(pending_jobs_df["split"].isnull()) & (pending_jobs_df["type"] == processing_step.job_type)
],
cache_entries_df=cache_entries_df[cache_entries_df["kind"] == processing_step.cache_kind],
)
for processing_step in self.processing_graph.get_input_type_processing_steps(input_type="config")
}
with StepProfiler(
method="ConfigState.__post_init__",
step="get_split_names",
):
self.split_names = fetch_names(
dataset=self.dataset,
config=self.config,
cache_kind=CONFIG_SPLIT_NAMES_KIND,
names_field="splits",
name_field="split",
) # Note that we use the cached content even the revision is different (ie. maybe obsolete)
if self.split_names: # empty if the config-split-names cache is missing
unexpected_split_names = set(cache_entries_df["split"].unique()).difference(
set(self.split_names).union({None})
)
if unexpected_split_names:
raise UnexceptedSplitNamesError(
f"Unexpected split names for dataset={self.dataset} config={self.config} ({len(unexpected_split_names)}): {list(islice(unexpected_split_names, 10))}{'' if len(unexpected_split_names) <= 10 else '...'}"
)
with StepProfiler(
method="ConfigState.__post_init__",
step="get_split_states",
):
self.split_states = [
SplitState(
dataset=self.dataset,
revision=self.revision,
config=self.config,
split=split_name,
processing_graph=self.processing_graph,
pending_jobs_df=pending_jobs_df[pending_jobs_df["split"] == split_name],
cache_entries_df=cache_entries_df[cache_entries_df["split"] == split_name],
)
for split_name in self.split_names
]