in sample_workloads/megatron-gke/docker/monitor_collectives.py [0:0]
def traced_batch_isend_irecv(p2p_op_list):
"""Intercepts invocations of torch.distributed.batch_isend_irecv.
Calculate [P2P-B/W] = [Message Size]/[Kernel Time] for each send and recv.
Args:
p2p_op_list: Passed to torch.distributed.batch_isend_irecv.
Returns:
Output of torch.distributed.batch_isend_irecv
"""
for p2p in p2p_op_list:
if _should_rank_record_comm(p2p.group, peer_rank=p2p.peer, is_ring=False):
api = 'send' if p2p.op == torch.distributed.untraced_isend else 'recv'
message_size = p2p.tensor.nelement() * p2p.tensor.element_size()
_emit_call_description(api, message_size, p2p.group, p2p.peer)
return torch.distributed.untraced_batch_isend_irecv(p2p_op_list)