sample_workloads/lit-gpt-demo/utilities/monitor_collectives.py [411:516]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  return torch.distributed.untraced_batch_isend_irecv(p2p_op_list)


# pylint: disable=g-doc-args,g-doc-return-or-yield
def traced_isend(tensor, dst, group=None, tag=0):
  """Intercepts invocations of torch.distributed.isend.

  Calculate [P2P-B/W] = [Message Size]/[Kernel Time]
  """
  if _should_rank_record_comm(group, peer_rank=dst, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('send', message_size, group, dst)

  return torch.distributed.untraced_isend(tensor, dst, group, tag)


# pylint: disable=g-doc-args,g-doc-return-or-yield
def traced_irecv(tensor, src=None, group=None, tag=0):
  """Intercepts invocations of torch.distributed.irecv.
  """
  if _should_rank_record_comm(group, peer_rank=src, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('recv', message_size, group, src)

  return torch.distributed.untraced_irecv(tensor, src, group, tag)


# pylint: disable=g-doc-args,g-doc-return-or-yield
def traced_send(tensor, dst, group=None, tag=0):
  """Intercepts invocations of torch.distributed.send.
  """
  if _should_rank_record_comm(group, peer_rank=dst, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('send', message_size, group, dst)

  return torch.distributed.untraced_send(tensor, dst, group, tag)


# pylint: disable=g-doc-args,g-doc-return-or-yield
def traced_recv(tensor, src=None, group=None, tag=0):
  """Intercepts invocations of torch.distributed.recv.
  """
  if _should_rank_record_comm(group, peer_rank=src, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('recv', message_size, group, src)

  return torch.distributed.untraced_recv(tensor, src, group, tag)


@functools.lru_cache(maxsize=None)
def _should_rank_record_comm(
    group=None, peer_rank=None, root_rank=None, is_ring=True):
  """Decides whether a given torch.distributed collective should be recorded.

  Args:
    group: The torch process group (i.e. participating GPUs) in this collective.
    peer_rank: In direct peer to peer operations, the global rank of the peer.
    root_rank: The global rank of the root GPU, for collectives with a root.
    as_ring: Whether the default NCCL implementation uses a ring algorithm.
    Specifying 'peer_rank' and 'is_ring=True' are incompatible.

  Returns:
    Whether to record a descriptive NVTX marker, and possibly print a log trace.
  """
  if not _is_current_process_in_group(group):
    return False
  if _TRACE_MODE == 'crossnode' and not _is_crossnode_comm(group, peer_rank):
    return False
  if not is_ring and root_rank is not None:
    return torch.distributed.get_rank() == root_rank

  return True


def _is_current_process_in_group(group=None):
  return torch.distributed.get_rank(group) >= 0


@functools.lru_cache(maxsize=None)
def _is_crossnode_comm(group=None, peer_rank=None):
  """Whether this collective involves communication across nodes.

  Args:
    group: The torch process group (i.e. participating GPUs) in this collective.
    peer: In direct peer to peer operations, the global rank of the peer.

  Returns:
    Whether this collective involves communications across nodes.
  """
  count_per_node = torch.cuda.device_count()

  if peer_rank is not None:
    this_node = int(torch.distributed.get_rank() / count_per_node)
    peer_node = int(peer_rank / count_per_node)
    return this_node != peer_node
  else:
    if group is not None:
      ranks = torch.distributed.get_process_group_ranks(group=group)
    else:
      ranks = [*range(torch.distributed.get_world_size())]

    nodes = list(map(lambda rank: int(rank / count_per_node), ranks))
    return any([node != nodes[0] for node in nodes])


def _emit_call_description(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



sample_workloads/megatron-gke/docker/monitor_collectives.py [504:641]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  return torch.distributed.untraced_batch_isend_irecv(p2p_op_list)


def traced_isend(tensor, dst, group=None, tag=0):
  """Intercepts invocations of torch.distributed.isend.

  Calculate [P2P-B/W] = [Message Size]/[Kernel Time]

  Args:
    tensor: Passed to torch.distributed.isend
    dst: Passed to torch.distributed.isend
    group: Passed to torch.distributed.isend
    tag: Passed to torch.distributed.isend.

  Returns:
    Output of torch.distributed.isend
  """
  if _should_rank_record_comm(group, peer_rank=dst, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('send', message_size, group, dst)

  return torch.distributed.untraced_isend(tensor, dst, group, tag)


def traced_irecv(tensor, src=None, group=None, tag=0):
  """Intercepts invocations of torch.distributed.irecv.

  Args:
    tensor: Passed to torch.distributed.irecv
    src: Passed to torch.distributed.irecv
    group: Passed to torch.distributed.irecv
    tag: Passed to torch.distributed.irecv.

  Returns:
    Output of torch.distributed.irecv
  """
  if _should_rank_record_comm(group, peer_rank=src, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('recv', message_size, group, src)

  return torch.distributed.untraced_irecv(tensor, src, group, tag)


def traced_send(tensor, dst, group=None, tag=0):
  """Intercepts invocations of torch.distributed.send.

  Args:
    tensor: Passed to torch.distributed.send
    dst: Passed to torch.distributed.send
    group: Passed to torch.distributed.send
    tag: Passed to torch.distributed.send.

  Returns:
    Output of torch.distributed.send
  """
  if _should_rank_record_comm(group, peer_rank=dst, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('send', message_size, group, dst)

  return torch.distributed.untraced_send(tensor, dst, group, tag)


def traced_recv(tensor, src=None, group=None, tag=0):
  """Intercepts invocations of torch.distributed.recv.

  Args:
    tensor: Passed to torch.distributed.recv
    src: Passed to torch.distributed.recv
    group: Passed to torch.distributed.recv
    tag: Passed to torch.distributed.recv.

  Returns:
    Output of torch.distributed.recv
  """
  if _should_rank_record_comm(group, peer_rank=src, is_ring=False):
    message_size = tensor.nelement() * tensor.element_size()
    _emit_call_description('recv', message_size, group, src)

  return torch.distributed.untraced_recv(tensor, src, group, tag)


@functools.lru_cache(maxsize=None)
def _should_rank_record_comm(
    group=None, peer_rank=None, root_rank=None, is_ring=True):
  """Decides whether a given torch.distributed collective should be recorded.

  Args:
    group: The torch process group (i.e. participating GPUs) in this collective.
    peer_rank: In direct peer to peer operations, the global rank of the peer.
    root_rank: The global rank of the root GPU, for collectives with a root.
    is_ring: Whether the default NCCL implementation uses a ring algorithm.
    Specifying 'peer_rank' and 'is_ring=True' are incompatible.

  Returns:
    Whether to record a descriptive NVTX marker, and possibly print a log trace.
  """
  if not _is_current_process_in_group(group):
    return False
  if _TRACE_MODE == 'crossnode' and not _is_crossnode_comm(group, peer_rank):
    return False
  if not is_ring and root_rank is not None:
    return torch.distributed.get_rank() == root_rank

  return True


def _is_current_process_in_group(group=None):
  return torch.distributed.get_rank(group) >= 0


@functools.lru_cache(maxsize=None)
def _is_crossnode_comm(group=None, peer_rank=None):
  """Whether this collective involves communication across nodes.

  Args:
    group: The torch process group (i.e. participating GPUs) in this collective.
    peer_rank: In direct peer to peer operations, the global rank of the peer.

  Returns:
    Whether this collective involves communications across nodes.
  """
  count_per_node = torch.cuda.device_count()

  if peer_rank is not None:
    this_node = int(torch.distributed.get_rank() / count_per_node)
    peer_node = int(peer_rank / count_per_node)
    return this_node != peer_node
  else:
    if group is not None:
      ranks = torch.distributed.get_process_group_ranks(group=group)
    else:
      ranks = [*range(torch.distributed.get_world_size())]

    nodes = list(map(lambda rank: int(rank / count_per_node), ranks))
    return any([node != nodes[0] for node in nodes])


def _emit_call_description(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



