in src/datatrove/pipeline/readers/base.py [0:0]
def read_files_shard(self, shard: list[str]) -> DocumentsPipeline:
"""
Reads a list of files and yield Documents
Args:
shard: a list of file paths
Returns: generator of Document
"""
li = 0
skipped = 0
with (
tqdm(
total=self.limit if self.limit != -1 else None,
desc="Document progress",
unit="doc",
disable=not self.doc_progress,
) as doc_pbar,
tqdm(total=len(shard), desc="File progress", unit="file", disable=not self.file_progress) as file_pbar,
):
for i, filepath in enumerate(shard):
self.stat_update("input_files")
logger.info(f"Reading input file {filepath}, {i + 1}/{len(shard)}")
di = 0
ndocs = 0
for di, document in enumerate(self.read_file(filepath)):
if skipped < self.skip:
skipped += 1
continue
if self.limit != -1 and li >= self.limit:
break
yield document
doc_pbar.update()
li += 1
ndocs += 1
file_pbar.update()
self.stat_update("documents", value=ndocs, unit="input_file")
if self.limit != -1 and li >= self.limit:
break