in pipeline/data/hplt.py [0:0]
def _run_download(self):
logger.info(f"Using HPLT locale {self.hplt_locale}")
shuffled_shard_urls = load_shuffled_shard_urls(self.hplt_locale)
self.stats.shards.filtered = len(shuffled_shard_urls)
# The shard URLs are shuffled, and then streamed into the read_lines iterator.
# This iterator can work over multiple documents. The first document is loaded,
# and then the documents in the shard are read in order from that shard. After
# the first shard is read, the iterator continues with the next shards until
# enough fluent sentences are collected. At this point the remaining shards
# will not be visited.
document_stream = self.stack.enter_context(
read_lines(shuffled_shard_urls, on_enter_location=self.stats.count_shards_visited)
)
for document_json in document_stream:
self.stats.document_count.value += 1
document = HPLTDocument(**json.loads(document_json))
overall_doc_score = document.doc_scores[0]
doc_lang = document.lang[0]
self._maybe_write_accumulated_text()
# HPLT 2.0 uses document level scores
if overall_doc_score < self.hplt_min_doc_score:
self.stats.filtered_doc_score.value += 1
continue
# We want only documents written primarily in the target language
if doc_lang != self.hplt_locale:
self.stats.filtered_doc_locale.value += 1
continue
# Visit the lines in the document.
for line_locale, line in zip(document.seg_langs, document.lines):
self.visited_lines += 1
self._process_line(line_locale, line)
if self.visited_lines % 5_000_000 == 0:
logger.info(f"Visited {self.visited_lines:,} lines")
logger.info(f"Kept {self.stats.visited_lines.kept:,}.")
logger.info(
f"Wrote {self.stats.final_lines.value:,} out of {self.max_lines:,}."
)
log_memory()
if self.stats.final_lines.value == self.max_lines:
break
if self.stats.final_lines.value == self.max_lines:
break
self._maybe_write_accumulated_text()
self.stats.visited_lines.filtered = self.visited_lines - self.stats.visited_lines.kept
logger.info(f"Wrote {self.stats.final_lines.value:,} lines to: {self.file_destination}")
stat_path = self.stats.save_json()
logger.info(f"Saved filtering stats to: {stat_path}")