def _scan_cached_repo()

in src/huggingface_hub/utils/_cache_manager.py [0:0]


def _scan_cached_repo(repo_path: Path) -> CachedRepoInfo:
    """Scan a single cache repo and return information about it.

    Any unexpected behavior will raise a [`~CorruptedCacheException`].
    """
    if not repo_path.is_dir():
        raise CorruptedCacheException(f"Repo path is not a directory: {repo_path}")

    if "--" not in repo_path.name:
        raise CorruptedCacheException(f"Repo path is not a valid HuggingFace cache directory: {repo_path}")

    repo_type, repo_id = repo_path.name.split("--", maxsplit=1)
    repo_type = repo_type[:-1]  # "models" -> "model"
    repo_id = repo_id.replace("--", "/")  # google/fleurs -> "google/fleurs"

    if repo_type not in {"dataset", "model", "space"}:
        raise CorruptedCacheException(
            f"Repo type must be `dataset`, `model` or `space`, found `{repo_type}` ({repo_path})."
        )

    blob_stats: Dict[Path, os.stat_result] = {}  # Key is blob_path, value is blob stats

    snapshots_path = repo_path / "snapshots"
    refs_path = repo_path / "refs"

    if not snapshots_path.exists() or not snapshots_path.is_dir():
        raise CorruptedCacheException(f"Snapshots dir doesn't exist in cached repo: {snapshots_path}")

    # Scan over `refs` directory

    # key is revision hash, value is set of refs
    refs_by_hash: Dict[str, Set[str]] = defaultdict(set)
    if refs_path.exists():
        # Example of `refs` directory
        # ── refs
        #     ├── main
        #     └── refs
        #         └── pr
        #             └── 1
        if refs_path.is_file():
            raise CorruptedCacheException(f"Refs directory cannot be a file: {refs_path}")

        for ref_path in refs_path.glob("**/*"):
            # glob("**/*") iterates over all files and directories -> skip directories
            if ref_path.is_dir() or ref_path.name in FILES_TO_IGNORE:
                continue

            ref_name = str(ref_path.relative_to(refs_path))
            with ref_path.open() as f:
                commit_hash = f.read()

            refs_by_hash[commit_hash].add(ref_name)

    # Scan snapshots directory
    cached_revisions: Set[CachedRevisionInfo] = set()
    for revision_path in snapshots_path.iterdir():
        # Ignore OS-created helper files
        if revision_path.name in FILES_TO_IGNORE:
            continue
        if revision_path.is_file():
            raise CorruptedCacheException(f"Snapshots folder corrupted. Found a file: {revision_path}")

        cached_files = set()
        for file_path in revision_path.glob("**/*"):
            # glob("**/*") iterates over all files and directories -> skip directories
            if file_path.is_dir():
                continue

            blob_path = Path(file_path).resolve()
            if not blob_path.exists():
                raise CorruptedCacheException(f"Blob missing (broken symlink): {blob_path}")

            if blob_path not in blob_stats:
                blob_stats[blob_path] = blob_path.stat()

            cached_files.add(
                CachedFileInfo(
                    file_name=file_path.name,
                    file_path=file_path,
                    size_on_disk=blob_stats[blob_path].st_size,
                    blob_path=blob_path,
                    blob_last_accessed=blob_stats[blob_path].st_atime,
                    blob_last_modified=blob_stats[blob_path].st_mtime,
                )
            )

        # Last modified is either the last modified blob file or the revision folder
        # itself if it is empty
        if len(cached_files) > 0:
            revision_last_modified = max(blob_stats[file.blob_path].st_mtime for file in cached_files)
        else:
            revision_last_modified = revision_path.stat().st_mtime

        cached_revisions.add(
            CachedRevisionInfo(
                commit_hash=revision_path.name,
                files=frozenset(cached_files),
                refs=frozenset(refs_by_hash.pop(revision_path.name, set())),
                size_on_disk=sum(
                    blob_stats[blob_path].st_size for blob_path in set(file.blob_path for file in cached_files)
                ),
                snapshot_path=revision_path,
                last_modified=revision_last_modified,
            )
        )

    # Check that all refs referred to an existing revision
    if len(refs_by_hash) > 0:
        raise CorruptedCacheException(
            f"Reference(s) refer to missing commit hashes: {dict(refs_by_hash)} ({repo_path})."
        )

    # Last modified is either the last modified blob file or the repo folder itself if
    # no blob files has been found. Same for last accessed.
    if len(blob_stats) > 0:
        repo_last_accessed = max(stat.st_atime for stat in blob_stats.values())
        repo_last_modified = max(stat.st_mtime for stat in blob_stats.values())
    else:
        repo_stats = repo_path.stat()
        repo_last_accessed = repo_stats.st_atime
        repo_last_modified = repo_stats.st_mtime

    # Build and return frozen structure
    return CachedRepoInfo(
        nb_files=len(blob_stats),
        repo_id=repo_id,
        repo_path=repo_path,
        repo_type=repo_type,  # type: ignore
        revisions=frozenset(cached_revisions),
        size_on_disk=sum(stat.st_size for stat in blob_stats.values()),
        last_accessed=repo_last_accessed,
        last_modified=repo_last_modified,
    )