def _run_download()

in pipeline/data/hplt.py [0:0]


    def _run_download(self):
        logger.info(f"Using HPLT locale {self.hplt_locale}")
        shuffled_shard_urls = load_shuffled_shard_urls(self.hplt_locale)
        self.stats.shards.filtered = len(shuffled_shard_urls)

        # The shard URLs are shuffled, and then streamed into the read_lines iterator.
        # This iterator can work over multiple documents. The first document is loaded,
        # and then the documents in the shard are read in order from that shard. After
        # the first shard is read, the iterator continues with the next shards until
        # enough fluent sentences are collected. At this point the remaining shards
        # will not be visited.
        document_stream = self.stack.enter_context(
            read_lines(shuffled_shard_urls, on_enter_location=self.stats.count_shards_visited)
        )

        for document_json in document_stream:
            self.stats.document_count.value += 1
            document = HPLTDocument(**json.loads(document_json))
            overall_doc_score = document.doc_scores[0]
            doc_lang = document.lang[0]

            self._maybe_write_accumulated_text()

            # HPLT 2.0 uses document level scores
            if overall_doc_score < self.hplt_min_doc_score:
                self.stats.filtered_doc_score.value += 1
                continue

            # We want only documents written primarily in the target language
            if doc_lang != self.hplt_locale:
                self.stats.filtered_doc_locale.value += 1
                continue

            # Visit the lines in the document.
            for line_locale, line in zip(document.seg_langs, document.lines):
                self.visited_lines += 1
                self._process_line(line_locale, line)
                if self.visited_lines % 5_000_000 == 0:
                    logger.info(f"Visited {self.visited_lines:,} lines")
                    logger.info(f"Kept {self.stats.visited_lines.kept:,}.")
                    logger.info(
                        f"Wrote {self.stats.final_lines.value:,} out of {self.max_lines:,}."
                    )
                    log_memory()

                if self.stats.final_lines.value == self.max_lines:
                    break

            if self.stats.final_lines.value == self.max_lines:
                break

            self._maybe_write_accumulated_text()

        self.stats.visited_lines.filtered = self.visited_lines - self.stats.visited_lines.kept
        logger.info(f"Wrote {self.stats.final_lines.value:,} lines to: {self.file_destination}")
        stat_path = self.stats.save_json()
        logger.info(f"Saved filtering stats to: {stat_path}")