in workload_generator/generate_deepspeed_stage1_2_workload.py [0:0]
def _reduce_ipg_grads(self):
if not self.args.contiguous_gradients:
super()._reduce_ipg_grads()
return
rank_start_end_idx = [[-1, -1, -1]]
for param in self.reduce_bucket[::-1]:
for rank, start_idx, end_idx in self.param_range_map[id(param)]:
if rank == rank_start_end_idx[-1][0]:
if rank_start_end_idx[-1][-1] != start_idx:
print(f"WARNNING {rank_start_end_idx[-1]} - {start_idx}")
rank_start_end_idx[-1][-1] = end_idx
else:
rank_start_end_idx.append([rank, start_idx, end_idx])
for rank, start_idx, end_idx in rank_start_end_idx[1:]:
self.workload.append(
LogItem(
comm_type=CommType.reduce,
comm_group=CommGroup.dp_group,
msg_size=(end_idx - start_idx) * self.elem_size,
comm_group_size=self.dp_world_size,
stage=f"{self.current_op}.average_tensor",
dst=rank,
)
)
self.reduce_bucket, self.num_in_reduce_bucket = [], 0