in src/datatrove/pipeline/extractors/base.py [0:0]
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""Iterates through each document in data and calls `timeout_extract` on it.
Args:
data: DocumentsPipeline:
rank: int: (Default value = 0)
world_size: int: (Default value = 1)
Returns:
"""
with ExtractorSandbox(timeout=self.timeout) as extractor:
for doc in data:
self.stat_update(StatHints.total)
with self.track_time():
try:
doc.text = extractor.process_document(doc.text, self.extract)
self.stat_update("extracted")
except TimeoutError:
self.stat_update("timeout")
logger.warning("⏰ Timeout while cleaning record text. Skipping record.")
continue
except EOFError:
# Process died unexpectedly
self.stat_update("broken_process")
logger.warning("Process died unexpectedly, will create new process for next document")
continue
except Exception as e:
self.stat_update("clean_error")
if not self._warned_error:
logger.warning(
f'❌ Error "{e}" while cleaning record text. Skipping record. '
f"This message will only appear once."
)
self._warned_error = True
continue
if doc.text:
self.stat_update(StatHints.forwarded)
self.update_doc_stats(doc)
yield doc
else:
self.stat_update(StatHints.dropped)