def distributed_wait_for_completion()

in src/nanotron/s3_checkpoints/s3_mover.py [0:0]


    def distributed_wait_for_completion(self, group: Optional[ProcessGroup] = None):
        """Wait for the previous checkpoint to be fully uploaded and removed in a distributed setting.
        Will wait for all process to be ready
        """
        if group is None:
            group = dist.torch_dist.distributed_c10d._get_default_group()

        test_tensor = torch.tensor([self.is_previous_save_finished()], device=torch.device("cuda"))
        test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(group.size())]
        dist.all_gather(test_tensor_list, test_tensor, group=group, async_op=False)
        dist.barrier()
        all_saved = sum(bool(tensor.item()) for tensor in test_tensor_list)
        if all_saved != group.size() and self.state != self.S3MoverState.IDLE:
            self._warning(
                f"Waiting previous checkpoint saving is finished - S3Mover {dist.get_rank(group)} still in {self.state} state.",
            )
        while all_saved != group.size():
            stdout = self.get_current_stdout()
            stdout_lines = [lst for lst in stdout.split("\n") if lst]
            if self.state != self.S3MoverState.IDLE:
                self._warning(
                    f"[S3] Waiting {self.state.value}: {all_saved} / {group.size()}. Stdout: {len(stdout_lines)} end: {stdout_lines[-1:]}",
                )
            # sync all our saves on NCCL we could do a dist barrier later but this helps us not losing NCCL connections down the line
            test_tensor = torch.tensor([self.is_previous_save_finished()], device=torch.device("cuda"))
            test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(group.size())]
            dist.all_gather(test_tensor_list, test_tensor, group=group, async_op=False)
            dist.barrier()
            all_saved = sum(bool(tensor.item()) for tensor in test_tensor_list)
            time.sleep(1)  # TODO @nouamane: make this configurable