in astra-sim-alibabacloud/astra-sim/system/collective/Ring.cc [180:230]
void Ring::insert_packet(Callable* sender) {
if (!enabled) {
return;
}
if (zero_latency_packets == 0 && non_zero_latency_packets == 0) {
zero_latency_packets = parallel_reduce * 1;
non_zero_latency_packets =
get_non_zero_latency_packets(); //(nodes_in_ring-1)*parallel_reduce*1;
toggle = !toggle;
}
if (zero_latency_packets > 0) {
packets.push_back(MyPacket(
stream->current_queue_id,
current_sender,
current_receiver)); // vnet Must be changed for alltoall topology
packets.back().sender = sender;
locked_packets.push_back(&packets.back());
processed = false;
send_back = false;
NPU_to_MA = true;
process_max_count();
zero_latency_packets--;
return;
} else if (non_zero_latency_packets > 0) {
// if(id == 0) std::cout << "non_zero_latency_packets > 0" << std::endl;
packets.push_back(MyPacket(
stream->current_queue_id,
current_sender,
current_receiver)); // vnet Must be changed for alltoall topology
packets.back().sender = sender;
locked_packets.push_back(&packets.back());
if (comType == ComType::Reduce_Scatter ||
(comType == ComType::All_Reduce && toggle)) {
processed = true;
} else {
processed = false;
}
if (non_zero_latency_packets <= parallel_reduce * 1) {
send_back = false;
} else {
send_back = true;
}
NPU_to_MA = false;
std::cout << "id: " << id << " non-zero latency packets at tick: " << Sys::boostedTick() << std::endl;
process_max_count();
non_zero_latency_packets--;
return;
}
Sys::sys_panic("should not inject nothing!");
}