def compute_parquet_response()

in services/worker/src/worker/job_runners/dataset/parquet.py [0:0]


def compute_parquet_response(dataset: str) -> tuple[DatasetParquetResponse, float]:
    """
    Get the response of 'dataset-parquet' for one specific dataset on huggingface.co.

    Args:
        dataset (`str`):
            A namespace (user or an organization) and a repo name separated by a `/`.

    Raises:
        [~`libcommon.simple_cache.CachedArtifactError`]:
          If the previous step gave an error.
        [~`libcommon.exceptions.PreviousStepFormatError`]:
            If the content of the previous step has not the expected format

    Returns:
        `tuple[DatasetParquetResponse, float]`: A tuple with the parquet_response (list of parquet files) and progress.
    """
    logging.info(f"compute 'dataset-parquet' for {dataset=}")

    config_names_response = get_previous_step_or_raise(kind="dataset-config-names", dataset=dataset)
    content = config_names_response["content"]
    if "config_names" not in content:
        raise PreviousStepFormatError("Previous step did not return the expected content: 'config_names'.")

    try:
        parquet_files: list[SplitHubFile] = []
        total = 0
        pending = []
        failed = []
        partial = False
        for config_item in content["config_names"]:
            config = config_item["config"]
            total += 1
            try:
                response = get_response(kind="config-parquet", dataset=dataset, config=config)
            except CachedArtifactNotFoundError:
                logging.debug("No response found in previous step for this dataset: 'config-parquet' endpoint.")
                pending.append(
                    PreviousJob(
                        {
                            "kind": "config-parquet",
                            "dataset": dataset,
                            "config": config,
                            "split": None,
                        }
                    )
                )
                continue
            if response["http_status"] != HTTPStatus.OK:
                logging.debug(f"Previous step gave an error: {response['http_status']}.")
                failed.append(
                    PreviousJob(
                        {
                            "kind": "config-parquet",
                            "dataset": dataset,
                            "config": config,
                            "split": None,
                        }
                    )
                )
                continue
            config_parquet_content = ConfigParquetResponse(
                parquet_files=response["content"]["parquet_files"],
                partial=response["content"]["partial"],
                features=None,  # we can keep it None since we don't pass it to DatasetParquetResponse anyway
            )
            parquet_files.extend(config_parquet_content["parquet_files"])
            partial = partial or config_parquet_content["partial"]
    except Exception as e:
        raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e

    progress = (total - len(pending)) / total if total else 1.0

    return (
        DatasetParquetResponse(parquet_files=parquet_files, pending=pending, failed=failed, partial=partial),
        progress,
    )