in src/datatrove/pipeline/readers/huggingface.py [0:0]
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
from datasets import load_dataset # type: ignore
if data:
yield from data
ds = load_dataset(self.dataset, **self.dataset_options, streaming=self.streaming)
if self.shuffle_files:
if not self.streaming:
ds = ds.shuffle(seed=42)
else:
ds = ds.shuffle(seed=42, buffer_size=1000)
# In case the dataset is (Iterable)?DatasetDict, raise informative error
if isinstance(ds, dict):
raise ValueError(
f"You forgot to specify the split of the dataset. Update your dataset_options to include 'split'. Available splits: {list(ds.keys())}"
)
shard = self._get_dataset_shard(ds, rank, world_size)
if not shard:
return
with tqdm(total=self.limit if self.limit != -1 else None, disable=not self.doc_progress) as pbar:
li = 0
for batch in shard.iter(self.batch_size):
if self.limit != -1 and li >= self.limit:
break
documents = []
with self.track_time("batch"):
for line in (dict(zip(batch, t)) for t in zip(*batch.values())):
if self.limit != -1 and li >= self.limit:
break
document = self.get_document_from_dict(line, self.dataset, f"{rank:05d}/{li}")
if not document:
continue
documents.append(document)
self.update_doc_stats(document)
self.stat_update("documents")
li += 1
pbar.update()
yield from documents