in libs/libcommon/src/libcommon/orchestrator.py [0:0]
def __post_init__(self) -> None:
super().__post_init__()
cache_kinds = [
processing_step.cache_kind for processing_step in self.processing_graph.get_first_processing_steps()
]
# Try to be robust to a burst of webhooks or out-of-order webhooks
# by waiting up to 2 seconds for a coherent state
for retry in range(3):
cache_entries_df = get_cache_entries_df(
dataset=self.dataset,
cache_kinds=cache_kinds,
)
if len(cache_entries_df) == 0:
raise SmartUpdateImpossibleBecauseCacheIsEmpty(f"Failed to smart update to {self.revision[:7]}")
cached_git_revisions = cache_entries_df["dataset_git_revision"].unique()
if len(cached_git_revisions) > 1:
raise SmartUpdateImpossibleBecauseCacheHasMultipleRevisions(
f"Expected only 1 revision in the cache but got {len(cached_git_revisions)}: "
+ ", ".join(cached_git_revisions)
)
self.cached_revision = cache_entries_df.sort_values("updated_at").iloc[-1]["dataset_git_revision"]
if self.cached_revision == self.revision:
return
elif self.cached_revision == self.old_revision:
break
logging.warning(
f"[{retry + 1}/3] Retrying smart update of {self.dataset} in 1s (received {str(self.old_revision)[:7]}->{self.revision[:7]} but cache is {self.cached_revision[:7]})"
)
time.sleep(1)
else:
logging.warning(
f"Failed to smart update {self.dataset} to {self.revision[:7]} because the cached revision {self.cached_revision[:7]} is not its parent"
)
raise SmartUpdateImpossibleBecauseCachedRevisionIsNotParentOfNewRevision(
f"Failed to smart update {self.dataset} to {self.revision[:7]} because the cached revision {self.cached_revision[:7]} is not its parent"
)
self.diff = self.get_diff()
self.files_impacted_by_commit = self.get_impacted_files()
if self.files_impacted_by_commit - {
"README.md",
".gitattributes",
".gitignore",
}: # TODO: maybe support .huggingface.yaml later
raise SmartUpdateImpossibleBecauseOfUpdatedFiles(", ".join(self.files_impacted_by_commit)[:1000])
self.updated_yaml_fields_in_dataset_card = self.get_updated_yaml_fields_in_dataset_card()
for yaml_field in YAML_FIELDS_TO_CHECK:
if yaml_field in self.updated_yaml_fields_in_dataset_card:
raise SmartUpdateImpossibleBecauseOfUpdatedYAMLField(yaml_field)
# We update the cache entries and the storage (assets + cached assets)
# We don't update the jobs because they might be creating artifacts that won't be updated by this code,
# so we let them finish and restart later.
self.add_task(
UpdateRevisionOfDatasetCacheEntriesTask(
dataset=self.dataset, old_revision=self.old_revision, new_revision=self.revision
)
)
if self.storage_clients:
for storage_client in self.storage_clients:
self.add_task(
UpdateRevisionOfDatasetStorageTask(
dataset=self.dataset,
old_revision=self.old_revision,
new_revision=self.revision,
storage_client=storage_client,
)
)