def traced_batch_isend_irecv()

in sample_workloads/megatron-gke/docker/monitor_collectives.py [0:0]


def traced_batch_isend_irecv(p2p_op_list):
  """Intercepts invocations of torch.distributed.batch_isend_irecv.

  Calculate [P2P-B/W] = [Message Size]/[Kernel Time] for each send and recv.

  Args:
    p2p_op_list: Passed to torch.distributed.batch_isend_irecv.

  Returns:
    Output of torch.distributed.batch_isend_irecv
  """
  for p2p in p2p_op_list:
    if _should_rank_record_comm(p2p.group, peer_rank=p2p.peer, is_ring=False):
      api = 'send' if p2p.op == torch.distributed.untraced_isend else 'recv'

      message_size = p2p.tensor.nelement() * p2p.tensor.element_size()
      _emit_call_description(api, message_size, p2p.group, p2p.peer)

  return torch.distributed.untraced_batch_isend_irecv(p2p_op_list)