in cc_net/mine.py [0:0]
def move_segments(conf: Config, all_dirs: Sequence[Path]) -> Path:
"""Reshards each language/quality after 'mine'."""
# check that mining is over.
regroup_dir = conf.get_mined_dir(regroup=True)
assert all_dirs, "Received no dirs to move"
assert all(
d.is_dir() for d in all_dirs
), f"move_segments was expecting dirs received files: {all_dirs[:10]}..."
regroup_dir.parent.mkdir(exist_ok=True)
regroup_dir.mkdir(exist_ok=True)
ex = conf.get_executor(f"moveseg_{conf.dump}", mem_gb=1, timeout_hour=1, cpus=2)
def _move_segments(subdir: Path, regroup_dir: Path) -> str:
n = 0
for f in subdir.iterdir():
if not f.is_file() or f.is_symlink():
continue
n += f.name.endswith(".json.gz")
new_name = regroup_dir / f.name
target = new_name.resolve()
assert f.resolve() != target
# this make the job idempotent.
f.rename(new_name)
f.symlink_to(target)
if n == 0:
return ""
return f"Moved {n} .json.gz files from {subdir} to {regroup_dir}"
ex(_move_segments, all_dirs, repeat(regroup_dir))
print(f"Results are in {regroup_dir}")
return regroup_dir