in services/worker/src/worker/job_runners/dataset/parquet.py [0:0]
def compute_parquet_response(dataset: str) -> tuple[DatasetParquetResponse, float]:
"""
Get the response of 'dataset-parquet' for one specific dataset on huggingface.co.
Args:
dataset (`str`):
A namespace (user or an organization) and a repo name separated by a `/`.
Raises:
[~`libcommon.simple_cache.CachedArtifactError`]:
If the previous step gave an error.
[~`libcommon.exceptions.PreviousStepFormatError`]:
If the content of the previous step has not the expected format
Returns:
`tuple[DatasetParquetResponse, float]`: A tuple with the parquet_response (list of parquet files) and progress.
"""
logging.info(f"compute 'dataset-parquet' for {dataset=}")
config_names_response = get_previous_step_or_raise(kind="dataset-config-names", dataset=dataset)
content = config_names_response["content"]
if "config_names" not in content:
raise PreviousStepFormatError("Previous step did not return the expected content: 'config_names'.")
try:
parquet_files: list[SplitHubFile] = []
total = 0
pending = []
failed = []
partial = False
for config_item in content["config_names"]:
config = config_item["config"]
total += 1
try:
response = get_response(kind="config-parquet", dataset=dataset, config=config)
except CachedArtifactNotFoundError:
logging.debug("No response found in previous step for this dataset: 'config-parquet' endpoint.")
pending.append(
PreviousJob(
{
"kind": "config-parquet",
"dataset": dataset,
"config": config,
"split": None,
}
)
)
continue
if response["http_status"] != HTTPStatus.OK:
logging.debug(f"Previous step gave an error: {response['http_status']}.")
failed.append(
PreviousJob(
{
"kind": "config-parquet",
"dataset": dataset,
"config": config,
"split": None,
}
)
)
continue
config_parquet_content = ConfigParquetResponse(
parquet_files=response["content"]["parquet_files"],
partial=response["content"]["partial"],
features=None, # we can keep it None since we don't pass it to DatasetParquetResponse anyway
)
parquet_files.extend(config_parquet_content["parquet_files"])
partial = partial or config_parquet_content["partial"]
except Exception as e:
raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e
progress = (total - len(pending)) / total if total else 1.0
return (
DatasetParquetResponse(parquet_files=parquet_files, pending=pending, failed=failed, partial=partial),
progress,
)