def regroup()

in cc_net/mine.py [0:0]


def regroup(conf: Config, all_dirs: List[Path]) -> Path:
    """Reshards each language/quality after 'mine'."""
    regroup_dir = conf.get_mined_dir(regroup=True)
    assert all_dirs
    all_files = [f for d in all_dirs for f in d.glob("*.json.gz")]
    if not all_files:
        print(f"No .json.gz file found in {all_dirs[0]}")

    splits: Dict[str, List[Path]] = defaultdict(list)
    for f in all_files:
        split = f.name.split(".")[0]
        splits[split].append(f)

    print(f"Identified {len(all_files)} files to regroup from {len(splits)} splits.")
    inputs: List[List[Path]] = []
    outputs: List[Path] = []
    target_size = jsonql.parse_size(conf.target_size)
    for split, files in splits.items():
        cuts = list(regroup_module.determine_groups(files, target_size=target_size))
        if not cuts:
            continue

        pattern = f"{split}_????.json.gz"
        existing_outputs = sorted(regroup_dir.glob(pattern))

        if not conf.cleanup_after_regroup:
            # We still have all the inputs so it is safe to overwrite existing outputs.
            assert len(existing_outputs) <= len(cuts)
            existing_outputs = []

        if len(existing_outputs) > 0 and len(cuts) == 1:
            # append to existing file if size allows it.
            new_size = (
                sum(f.stat().st_size for f in cuts[0])
                + existing_outputs[-1].stat().st_size
            )
            if new_size < target_size:
                print(f"Will append {cuts[0]} to {existing_outputs[-1]}")
                cuts[0].insert(0, existing_outputs.pop(-1))

        n_existing = len(existing_outputs)
        for i, cut in enumerate(cuts):
            # avoid overwriting existing files.
            j = i + n_existing
            output = regroup_dir / f"{split}_{j:04}.json.gz"
            inputs.append(cut)
            outputs.append(output)
        print(
            str(regroup_dir / pattern),
            "->",
            len(cuts),
            f"shards ({n_existing} already there).",
        )

    ex = conf.get_executor(f"regroup_{conf.dump}", mem_gb=1, timeout_hour=12, cpus=2)
    ex(_regroup, repeat(conf), inputs, outputs)

    return regroup_dir