in misc/reference_datasets/multilingual/copy_raw_data.py [0:0]
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""
Will get this rank's shard and sequentially read each file in the shard, yielding Document.
Args:
data: any existing data from previous pipeline stages
rank: rank of the current task
world_size: total number of tasks
Returns:
"""
from loguru import logger
if data:
yield from data
files_shard = []
pathi = 0
with open("/path/to/base_proc_filelist.txt", "rt") as f:
for path in f:
if path.split("/")[1] == self.dump_to_proc:
if (pathi - rank) % world_size == 0:
files_shard.append(path.strip())
pathi += 1
logger.info(f"Loaded {len(files_shard)} for {rank=}, {world_size=}")
if len(files_shard) == 0:
if rank == 0:
raise RuntimeError(f"No files found on {self.data_folder.path}!")
# otherwise just a warning
logger.warning(f"No files found on {self.data_folder.path} for {rank=}")
for doc in self.read_files_shard(files_shard):
self.update_doc_stats(doc)
yield doc