in services/worker/src/worker/job_runners/config/parquet_and_info.py [0:0]
def get_urlpaths_in_gen_kwargs(gen_kwargs: dict[str, Any]) -> list[str]:
"""
Return the (deduplicated) list of file sources according to the input gen_kwargs.
In case of chained URLs like `zip://xxx::hf://yyy`, only `hf://yyy` is returned.
"""
# Having lists of different sizes makes sharding ambigious, raise an error in this case (same as in the `datasets` lib)
lists = [value for value in gen_kwargs.values() if isinstance(value, list)] or [[]]
if len(set(len(list_) for list_ in lists)) > 1:
raise RuntimeError(
(
"Sharding is ambiguous for this dataset: "
+ "we found several data sources lists of different lengths, and we don't know over which list we should list shards.\n"
+ "To fix this, check the 'gen_kwargs' and make sure to use lists only for data sources, "
+ "and use tuples otherwise. In the end there should only be one single list, or several lists with the same length."
)
)
shards = max(lists, key=len)
urlpaths: set[str] = set()
for shard in shards:
# Standard list of shards [data0.json, data1.json, ...]
if isinstance(shard, str):
urlpaths.add(shard.split("::")[-1])
# Each item can also be an iterator
# (typically used in builders that support iterating on extracted zip files)
elif isinstance(shard, FilesIterable):
urlpaths.update(item.split("::")[-1] for item in shard)
# ImageFolder / AudioFolder list of shards
# Each item is a tuple like (optional original file, downloaded file)
elif shard and isinstance(shard, tuple):
if isinstance(shard[-1], FilesIterable):
urlpaths.update(item.split("::")[-1] for item in shard[-1])
elif shard[-1] and isinstance(shard[-1][0], str):
urlpaths.update(item.split("::")[-1] for item in shard[-1])
# WebDataset list of shards
# (it iterates on TAR archives)
elif isinstance(shard, ArchiveIterable) and shard.args and isinstance(shard.args[0], str):
urlpaths.add(shard.args[0].split("::")[-1])
return [url_to_fs(urlpath)[0].unstrip_protocol(urlpath) for urlpath in urlpaths]