def traced_gather()

in sample_workloads/megatron-gke/docker/monitor_collectives.py [0:0]


def traced_gather(
    tensor, gather_list=None, dst=0, group=None, async_op=False):
  """Intercepts invocations of torch.distributed.gather.

  Let T := sum([Receive Kernel Time from Rank i] for i != dst)
  Calculate [P2P-B/W] = [Message Size]/T

  Each of (n-1) ranks sends a message to the root.

  Note that any correction factors for the bus bandwidth (e.g. [n-1]/n) depend
  on the *definition* of 'Message Size'. In some cases, such as for 'gather', we
  define 'Message Size' so as to omit the size of data that is already local
  to the destination GPU for the 'gather' operation. In this case, no correction
  factor is needed. In NCCL tests, they assume all ranks send equal sized
  messages and include this size of data already resident on the destination
  GPU. Thus, in there case you see a (n-1)/n correction factor on calculating
  the bus bandwidth. In general, the goal of computing the bus bandwidth is
  to compare data transfer rates on the bus relative to peak bus bandwidth.
  See https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md.

  https://github.com/NVIDIA/nccl-tests/blob/1a5f551ffd6e/src/gather.cu#L54
  https://github.com/pytorch/pytorch/blob/bfd995f0d6bf/torch/csrc/cuda/nccl.cpp#L1040

  Args:
    tensor: Passed to torch.distributed.gather
    gather_list: Passed to torch.distributed.gather
    dst: Passed to torch.distributed.gather
    group: Passed to torch.distributed.gather
    async_op: Passed to torch.distributed.gather

  Returns:
    Output of torch.distributed.gather
  """
  if _should_rank_record_comm(group, root_rank=dst, is_ring=False):
    message_size = functools.reduce(
        lambda sz, x: sz + x.nelement() * x.element_size(), gather_list, 0)
    message_size -= tensor.nelement() * tensor.element_size()

    _emit_call_description('gather', message_size, group, root_rank=dst)

  return torch.distributed.untraced_gather(
      tensor, gather_list, dst, group, async_op)