def run()

in misc/reference_datasets/multilingual/copy_raw_data.py [0:0]


    def run(self, data=None, rank: int = 0, world_size: int = 1):
        """
        Will get this rank's shard and sequentially read each file in the shard, yielding Document.
        Args:
            data: any existing data from previous pipeline stages
            rank: rank of the current task
            world_size: total number of tasks

        Returns:

        """
        from loguru import logger
        if data:
            yield from data
        files_shard = []
        pathi = 0
        with open("/path/to/base_proc_filelist.txt", "rt") as f:
            for path in f:
                if path.split("/")[1] == self.dump_to_proc:
                    if (pathi - rank) % world_size == 0:
                        files_shard.append(path.strip())
                    pathi += 1
        logger.info(f"Loaded {len(files_shard)} for {rank=}, {world_size=}")
        if len(files_shard) == 0:
            if rank == 0:
                raise RuntimeError(f"No files found on {self.data_folder.path}!")
            # otherwise just a warning
            logger.warning(f"No files found on {self.data_folder.path} for {rank=}")
        for doc in self.read_files_shard(files_shard):
            self.update_doc_stats(doc)
            yield doc