in workload_generator/generate_deepspeed_stage1_2_workload.py [0:0]
def step(self):
self.current_op = "step"
self._reduce_ipg_grads()
self.workload.append(
LogItem(
comm_type=CommType.all_reduce,
comm_group=CommGroup.dp_group,
comm_group_size=self.dp_world_size,
msg_size=1,
stage=f"{self.current_op}.has_overflow",
)
)
num_params = sum([param.numel() for param in self.model.parameters()])
num_shards = max(num_params // self.allgather_bucket_size, 1)
shard_size = num_params // num_shards
for i in range(num_shards):
num_elements = (
num_params - i * shard_size if i == (num_shards - 1) else shard_size
)
padding_size = (
(self.dp_world_size - num_elements % self.dp_world_size)
if num_elements % self.dp_world_size
else 0
)
num_elements = num_elements + padding_size
self.workload.append(
LogItem(
comm_type=CommType.all_gather,
comm_group=CommGroup.dp_group,
comm_group_size=self.dp_world_size,
msg_size=num_elements * self.elem_size,
stage=f"{self.current_op}.all_gather_dp_groups",
)
)