in cc_net/mine.py [0:0]
def mine(conf: Config) -> List[Path]:
"""Remove dups, run LID and LMs, and split by lang and quality."""
mined_dir = conf.get_mined_dir()
if conf.will_split:
# Give a directories when splitting
outputs = [mined_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
else:
# Files otherwise
outputs = [
mined_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
]
if "mini_again" in conf.experiments:
mined_dir = conf.output_dir / "mini_again" / conf.dump
outputs = [mined_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
# TODO: try to reduce this / make it a function of "hash_in_mem" / num_langs
mem_gb = 60 + 1 * conf.hash_in_mem
timeout_hour = 5
if "hashes" in conf.experiments:
# HACK: used for generating paper figures
outputs = [
conf.output_dir / f"hashes_exp/{conf.dump}_0000_dedup{h:03d}.json.gz"
for h in HASHES_IN_MEM
]
mem_gb = int(max(HASHES_IN_MEM) * 1.2)
timeout_hour = 8
missing_outputs = [(shard, o) for shard, o in enumerate(outputs) if not o.exists()]
if "mini_again" in conf.experiments:
missing_outputs = [
(shard, o)
for shard, o in enumerate(outputs)
if shard in [5, 139] and not o.exists()
]
if not missing_outputs:
return outputs
mined_dir.mkdir(parents=True, exist_ok=True)
ex = conf.get_executor(
f"mine_{conf.dump}",
mem_gb=mem_gb,
timeout_hour=timeout_hour,
cpus=conf.mine_num_processes + 1,
)
# Compute hashes firsts.
if "dedup" in conf.pipeline:
hashes_groups = list(jsonql.grouper(hashes(conf), conf.hash_in_mem))
hashes_files: Iterable[List[Path]] = [
hashes_groups[shard // conf.hash_in_mem] for shard, o in missing_outputs
]
else:
hashes_files = repeat([])
ex(_mine_shard, repeat(conf), hashes_files, *_transpose(missing_outputs))
assert all(o.exists() for o in outputs)
return outputs