in cc_net/mine.py [0:0]
def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> str:
assert conf.pipeline
tmp_output = tmp(output)
if "hashes" in conf.experiments:
# HACK: used for generating paper figures
hashes_in_mem = shard
hashes = hashes[: HASHES_IN_MEM[hashes_in_mem]]
shard = 0
cc_shard = conf.get_cc_shard(shard)
steps: Dict[str, Optional[jsonql.Transformer]] = {}
lang_id = Path("bin") / "lid.bin"
steps["lid_before_dedup"] = split_by_lang.Classifier(
model=lang_id, field="raw_content", out_field="lid_before_dedup", top=5
)
steps["dedup"] = dedup.DuplicatesRemover(field="raw_content", hashes_files=hashes)
steps["lid"] = split_by_lang.Classifier(
model=lang_id,
field="raw_content",
out_field="language",
top=1,
threshold=conf.lang_threshold,
)
steps["lid_after_dedup"] = split_by_lang.Classifier(
model=lang_id, field="raw_content", out_field="lid_after_dedup", top=5
)
if conf.lang_blacklist:
steps["keep_lang"] = jsonql.where(
[lambda doc: doc.get("language") not in set(conf.lang_blacklist)]
)
elif conf.lang_whitelist:
steps["keep_lang"] = jsonql.where(
[lambda doc: doc.get("language") in set(conf.lang_whitelist)]
)
else:
steps["keep_lang"] = None
tok_field = "tokenized"
steps["sp"] = perplexity.MultiSentencePiece(
{l: conf.lm_dir / f"{l}.sp.model" for l in conf.get_lm_languages()},
field="raw_content",
output_field=tok_field,
normalize=True,
)
steps["lm"] = perplexity.DocLM(
{l: conf.lm_dir / f"{l}.arpa.bin" for l in conf.get_lm_languages()},
field=tok_field,
output_field="perplexity",
normalize=False, # Normalization is done before SentencePiece
# load_method=kenlm.LoadMethod.PARALLEL_READ,
)
steps["pp_bucket"] = perplexity.PerplexityBucket(CUTOFF_CSV)
steps["drop"] = perplexity.DropKeys(tok_field)
steps["keep_bucket"] = None
if conf.keep_bucket:
steps["keep_bucket"] = jsonql.where(
[lambda doc: doc.get("bucket", "all") in conf.keep_bucket]
)
if "fetch_metadata" in conf.pipeline:
# TODO: better default
assert conf.metadata is not None
steps["fetch_metadata"] = minify.MetadataFetcher(
f"{conf.metadata}/{conf.dump}/"
)
steps["minify"] = minify.Minifier()
pattern = str(tmp_output / "{language}_{bucket}.json.gz")
steps["split_by_lang"] = jsonql.split(pattern=str(pattern), mkdir=True)
steps["split_by_segment"] = jsonql.split(
split_fn=lambda doc: _get_segment(tmp_output, doc), mkdir=True
)
pipeline = filter(None, (steps[s] for s in conf.pipeline))
jsonql.run_pipes(
*pipeline,
inputs=cc_shard,
processes=conf.mine_num_processes,
chunksize=100,
# The splitter takes care of writing to files.
output=tmp_output if not conf.will_split else None,
)
finalize(tmp_output, output)
return f"Mined {output}"