in sample_workloads/megatron-gke/docker/monitor_collectives.py [0:0]
def traced_gather(
tensor, gather_list=None, dst=0, group=None, async_op=False):
"""Intercepts invocations of torch.distributed.gather.
Let T := sum([Receive Kernel Time from Rank i] for i != dst)
Calculate [P2P-B/W] = [Message Size]/T
Each of (n-1) ranks sends a message to the root.
Note that any correction factors for the bus bandwidth (e.g. [n-1]/n) depend
on the *definition* of 'Message Size'. In some cases, such as for 'gather', we
define 'Message Size' so as to omit the size of data that is already local
to the destination GPU for the 'gather' operation. In this case, no correction
factor is needed. In NCCL tests, they assume all ranks send equal sized
messages and include this size of data already resident on the destination
GPU. Thus, in there case you see a (n-1)/n correction factor on calculating
the bus bandwidth. In general, the goal of computing the bus bandwidth is
to compare data transfer rates on the bus relative to peak bus bandwidth.
See https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md.
https://github.com/NVIDIA/nccl-tests/blob/1a5f551ffd6e/src/gather.cu#L54
https://github.com/pytorch/pytorch/blob/bfd995f0d6bf/torch/csrc/cuda/nccl.cpp#L1040
Args:
tensor: Passed to torch.distributed.gather
gather_list: Passed to torch.distributed.gather
dst: Passed to torch.distributed.gather
group: Passed to torch.distributed.gather
async_op: Passed to torch.distributed.gather
Returns:
Output of torch.distributed.gather
"""
if _should_rank_record_comm(group, root_rank=dst, is_ring=False):
message_size = functools.reduce(
lambda sz, x: sz + x.nelement() * x.element_size(), gather_list, 0)
message_size -= tensor.nelement() * tensor.element_size()
_emit_call_description('gather', message_size, group, root_rank=dst)
return torch.distributed.untraced_gather(
tensor, gather_list, dst, group, async_op)