void Ring::insert_packet()

in astra-sim-alibabacloud/astra-sim/system/collective/Ring.cc [180:230]


void Ring::insert_packet(Callable* sender) {
  if (!enabled) {
    return;
  }
  if (zero_latency_packets == 0 && non_zero_latency_packets == 0) {
    zero_latency_packets = parallel_reduce * 1;
    non_zero_latency_packets =
        get_non_zero_latency_packets(); //(nodes_in_ring-1)*parallel_reduce*1;
    toggle = !toggle;
  }
  if (zero_latency_packets > 0) {
    packets.push_back(MyPacket(
        stream->current_queue_id,
        current_sender,
        current_receiver)); // vnet Must be changed for alltoall topology
    packets.back().sender = sender;
    locked_packets.push_back(&packets.back());
    processed = false;
    send_back = false;
    NPU_to_MA = true;
    process_max_count();
    zero_latency_packets--;
    return;
  } else if (non_zero_latency_packets > 0) {
    // if(id == 0) std::cout << "non_zero_latency_packets > 0" << std::endl;
    packets.push_back(MyPacket(
        stream->current_queue_id,
        current_sender,
        current_receiver)); // vnet Must be changed for alltoall topology
    packets.back().sender = sender;
    locked_packets.push_back(&packets.back());
    if (comType == ComType::Reduce_Scatter ||
        (comType == ComType::All_Reduce && toggle)) {
      processed = true;
    } else {
      processed = false;
    }
    if (non_zero_latency_packets <= parallel_reduce * 1) {
      send_back = false;
    } else {
      send_back = true;
    }
    NPU_to_MA = false;
    std::cout << "id: " << id << " non-zero latency packets at tick: " << Sys::boostedTick() << std::endl;
    process_max_count();
    non_zero_latency_packets--;
    return;
  }
  Sys::sys_panic("should not inject nothing!");

}