in src/nanotron/s3_checkpoints/s3_mover.py [0:0]
def distributed_wait_for_completion(self, group: Optional[ProcessGroup] = None):
"""Wait for the previous checkpoint to be fully uploaded and removed in a distributed setting.
Will wait for all process to be ready
"""
if group is None:
group = dist.torch_dist.distributed_c10d._get_default_group()
test_tensor = torch.tensor([self.is_previous_save_finished()], device=torch.device("cuda"))
test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(group.size())]
dist.all_gather(test_tensor_list, test_tensor, group=group, async_op=False)
dist.barrier()
all_saved = sum(bool(tensor.item()) for tensor in test_tensor_list)
if all_saved != group.size() and self.state != self.S3MoverState.IDLE:
self._warning(
f"Waiting previous checkpoint saving is finished - S3Mover {dist.get_rank(group)} still in {self.state} state.",
)
while all_saved != group.size():
stdout = self.get_current_stdout()
stdout_lines = [lst for lst in stdout.split("\n") if lst]
if self.state != self.S3MoverState.IDLE:
self._warning(
f"[S3] Waiting {self.state.value}: {all_saved} / {group.size()}. Stdout: {len(stdout_lines)} end: {stdout_lines[-1:]}",
)
# sync all our saves on NCCL we could do a dist barrier later but this helps us not losing NCCL connections down the line
test_tensor = torch.tensor([self.is_previous_save_finished()], device=torch.device("cuda"))
test_tensor_list = [torch.zeros_like(test_tensor) for _ in range(group.size())]
dist.all_gather(test_tensor_list, test_tensor, group=group, async_op=False)
dist.barrier()
all_saved = sum(bool(tensor.item()) for tensor in test_tensor_list)
time.sleep(1) # TODO @nouamane: make this configurable