in libs/libcommon/src/libcommon/orchestrator.py [0:0]
def __post_init__(self) -> None:
super().__post_init__()
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="all",
):
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="get_pending_jobs_df",
):
job_types = (
[
processing_step.job_type
for processing_step in self.processing_graph.get_first_processing_steps()
]
if self.only_first_processing_steps
else None
)
self.pending_jobs_df = Queue().get_pending_jobs_df(
dataset=self.dataset,
job_types=job_types,
)
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="get_cache_entries_df",
):
cache_kinds = (
[
processing_step.cache_kind
for processing_step in self.processing_graph.get_first_processing_steps()
]
if self.only_first_processing_steps
else None
)
self.cache_entries_df = get_cache_entries_df(
dataset=self.dataset,
cache_kinds=cache_kinds,
)
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="get_dataset_state",
):
self.dataset_state = (
FirstStepsDatasetState(
dataset=self.dataset,
processing_graph=self.processing_graph,
revision=self.revision,
pending_jobs_df=self.pending_jobs_df,
cache_entries_df=self.cache_entries_df,
)
if self.only_first_processing_steps
else DatasetState(
dataset=self.dataset,
processing_graph=self.processing_graph,
revision=self.revision,
pending_jobs_df=self.pending_jobs_df,
cache_entries_df=self.cache_entries_df,
)
)
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="_get_cache_status",
):
self.cache_status = self._get_cache_status()
with StepProfiler(
method="DatasetBackfillPlan.__post_init__",
step="_create_plan",
):
self._create_plan()