in cc_net/mine.py [0:0]
def regroup(conf: Config, all_dirs: List[Path]) -> Path:
"""Reshards each language/quality after 'mine'."""
regroup_dir = conf.get_mined_dir(regroup=True)
assert all_dirs
all_files = [f for d in all_dirs for f in d.glob("*.json.gz")]
if not all_files:
print(f"No .json.gz file found in {all_dirs[0]}")
splits: Dict[str, List[Path]] = defaultdict(list)
for f in all_files:
split = f.name.split(".")[0]
splits[split].append(f)
print(f"Identified {len(all_files)} files to regroup from {len(splits)} splits.")
inputs: List[List[Path]] = []
outputs: List[Path] = []
target_size = jsonql.parse_size(conf.target_size)
for split, files in splits.items():
cuts = list(regroup_module.determine_groups(files, target_size=target_size))
if not cuts:
continue
pattern = f"{split}_????.json.gz"
existing_outputs = sorted(regroup_dir.glob(pattern))
if not conf.cleanup_after_regroup:
# We still have all the inputs so it is safe to overwrite existing outputs.
assert len(existing_outputs) <= len(cuts)
existing_outputs = []
if len(existing_outputs) > 0 and len(cuts) == 1:
# append to existing file if size allows it.
new_size = (
sum(f.stat().st_size for f in cuts[0])
+ existing_outputs[-1].stat().st_size
)
if new_size < target_size:
print(f"Will append {cuts[0]} to {existing_outputs[-1]}")
cuts[0].insert(0, existing_outputs.pop(-1))
n_existing = len(existing_outputs)
for i, cut in enumerate(cuts):
# avoid overwriting existing files.
j = i + n_existing
output = regroup_dir / f"{split}_{j:04}.json.gz"
inputs.append(cut)
outputs.append(output)
print(
str(regroup_dir / pattern),
"->",
len(cuts),
f"shards ({n_existing} already there).",
)
ex = conf.get_executor(f"regroup_{conf.dump}", mem_gb=1, timeout_hour=12, cpus=2)
ex(_regroup, repeat(conf), inputs, outputs)
return regroup_dir