def run()

in src/datatrove/pipeline/readers/huggingface.py [0:0]


    def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        from datasets import load_dataset  # type: ignore

        if data:
            yield from data
        ds = load_dataset(self.dataset, **self.dataset_options, streaming=self.streaming)

        if self.shuffle_files:
            if not self.streaming:
                ds = ds.shuffle(seed=42)
            else:
                ds = ds.shuffle(seed=42, buffer_size=1000)

        # In case the dataset is (Iterable)?DatasetDict, raise informative error
        if isinstance(ds, dict):
            raise ValueError(
                f"You forgot to specify the split of the dataset. Update your dataset_options to include 'split'. Available splits: {list(ds.keys())}"
            )

        shard = self._get_dataset_shard(ds, rank, world_size)
        if not shard:
            return
        with tqdm(total=self.limit if self.limit != -1 else None, disable=not self.doc_progress) as pbar:
            li = 0
            for batch in shard.iter(self.batch_size):
                if self.limit != -1 and li >= self.limit:
                    break
                documents = []
                with self.track_time("batch"):
                    for line in (dict(zip(batch, t)) for t in zip(*batch.values())):
                        if self.limit != -1 and li >= self.limit:
                            break
                        document = self.get_document_from_dict(line, self.dataset, f"{rank:05d}/{li}")
                        if not document:
                            continue
                        documents.append(document)
                        self.update_doc_stats(document)
                        self.stat_update("documents")
                        li += 1
                        pbar.update()
                yield from documents