in tensorflow_datasets/text/c4.py [0:0]
def _get_page_content(self, pipeline, file_paths, dl_manager):
"""Build PCollection of un-split page content."""
beam = tfds.core.lazy_imports.apache_beam
def download_wet_file(path, dl_dir):
url = f"{_DOWNLOAD_HOST}/{path}"
out_path = f"{dl_dir}/{path}"
if tf.io.gfile.exists(out_path):
c4_utils.get_counter_inc_fn("download_wet_url")("exists")
return out_path
tmp_dir = f"{out_path}.incomplete{uuid.uuid4().hex}"
try:
tf.io.gfile.makedirs(tmp_dir)
downloader = tfds.download.downloader.get_downloader()
with downloader.tqdm():
# TODO(slebedev): Investigate why pytype infers Promise[Future[...]].
dl_path = downloader.download(url, tmp_dir).get().path # type: ignore
tf.io.gfile.rename(os.fspath(dl_path), out_path, overwrite=True)
finally:
if tf.io.gfile.exists(tmp_dir):
tf.io.gfile.rmtree(tmp_dir)
c4_utils.get_counter_inc_fn("download_wet_url")("downloaded")
return out_path
wet_file_paths = (
pipeline
| "create_wet_path_urls" >> beam.Create(file_paths["wet_path_urls"])
| beam.io.ReadAllFromText(
compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED)
# Increase parallelism.
| beam.Reshuffle()
| "filter_corrupt_wet_files" >>
beam.Filter(lambda p: p not in _KNOWN_CORRUPT_WET_FILES)
| beam.Map(
download_wet_file,
dl_dir=os.path.join(dl_manager.download_dir, "c4_wet_files")))
# Parse WET files and filter by length.
# Output: url, text
page_content = (
wet_file_paths
| beam.FlatMap(c4_utils.split_wet_file)
| beam.Filter(c4_utils.is_valid_length))
# Optionally filter for RealNews domains.
# Output: url, text
if self.builder_config.realnewslike:
with tf.io.gfile.GFile(file_paths["realnews_domains"]) as f:
realnews_domains = json.load(f)
page_content = (
page_content
| beam.Filter(c4_utils.is_realnews_domain, realnews_domains))
# Normalize and deduplicate by URL.
# Output: url, text
page_content = (
page_content
| "normalize_url" >> beam.Map(c4_utils.normalize_url)
| "group_url" >> beam.GroupByKey()
| beam.Map(c4_utils.dedupe_urls))
# Optionally filter for WebText-like URLs.
# Output: url, text
if self.builder_config.webtextlike:
webtextlike_urls = (
pipeline
| "read_webtextlike_urls" >> beam.io.ReadFromText(
os.path.join(file_paths["openwebtext_urls_zip"],
_OPENWEBTEXT_URLS_FILE_PATTERN))
| "add_dummy_page" >> beam.Map(lambda x: (x, ""))
| "normal_webtext_url" >> beam.Map(c4_utils.normalize_url))
page_content = ({
"text": page_content,
"webtextlike_urls": webtextlike_urls
}
| "group_webtextlike_urls" >> beam.CoGroupByKey()
| beam.FlatMap(c4_utils.filter_by_webtextlike))
if self.builder_config.paragraph_filter:
page_content |= beam.Filter(c4_utils.paragraph_filter)
if self.builder_config.clean:
page_content = (
page_content
| "clean_pages" >> beam.FlatMap(c4_utils.get_clean_page_fn()))
if self.builder_config.dedupe:
page_content = (
# Also removes documents with too few sentences after deduplication.
c4_utils.remove_duplicate_text(page_content) # pylint:disable=g-long-ternary
if self.builder_config.clean else
# If we are not cleaning, do not remove too-few-sentence documents.
c4_utils.remove_duplicate_text(page_content, min_num_sentences=0))
# Add detected language.
if self.builder_config.languages == ["en"]:
# Use langdetect for reproducibility of the original C4.
page_content |= beam.FlatMap(c4_utils.detect_english)
else:
page_content = c4_utils.detect_languages(
page_content, valid_languages=self.builder_config.languages)
if self.builder_config.badwords_filter:
# Create dictionary of badwords regex for each available language.
badwords = collections.defaultdict(set)
for lang, path in file_paths["badwords"].items():
lang = lang.split("-")[0] # remove suffix if present
with tf.io.gfile.GFile(path) as f:
badwords[lang].update(l.strip() for l in f)
page_content |= beam.Filter(c4_utils.get_badwords_filter_fn(badwords))
return page_content