in astra-sim-alibabacloud/astra-sim/system/collective/NcclTreeFlowModel.cc [445:511]
void NcclTreeFlowModel::insert_packets(int channel_id, int flow_id) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
assert(channel_id < m_channels);
if (!enabled) {
return;
}
assert(_flow_models.count(std::make_pair(channel_id, flow_id)) != 0);
MockNccl::SingleFlow f = _flow_models[std::make_pair(channel_id, flow_id)];
assert(zero_latency_packets->count(channel_id) != 0 && non_zero_latency_packets->count(channel_id) != 0);
if ((*zero_latency_packets)[channel_id] == 0 && (*non_zero_latency_packets)[channel_id] == 0) {
(*zero_latency_packets)[channel_id] = parallel_reduce * 1;
(*non_zero_latency_packets)[channel_id] = get_non_zero_latency_packets();
toggle = !toggle;
}
int current_receiver = f.dest;
std::vector<int> current_sender = f.prev;
if ((*zero_latency_packets)[channel_id] > 0) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*zero_latency_packets)[channel_id] > 0",id);
uint64_t message_size = f.flow_size;
packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket(
stream->current_queue_id,
current_sender[0],
current_receiver,
message_size,
channel_id,
flow_id));
packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id);
packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr;
processed = false;
send_back = false;
NPU_to_MA = true;
release_packets(channel_id, flow_id, message_size);
(*zero_latency_packets)[channel_id]--;
NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*zero_latency_packets)[channel_id] : %d ",id,(*zero_latency_packets)[channel_id]);
return;
} else if ((*non_zero_latency_packets)[channel_id] > 0) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*non_zero_latency_packets)[channel_id] > 0",id);
uint64_t message_size = f.flow_size;
packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket(
stream->current_queue_id,
current_sender[0],
current_receiver,
message_size,
channel_id,
flow_id));
packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id);
packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr;
if (comType == ComType::Reduce_Scatter ||
(comType == ComType::All_Reduce && toggle)) {
processed = true;
} else {
processed = false;
}
if ((*non_zero_latency_packets)[channel_id] <= parallel_reduce * 1) {
send_back = false;
} else {
send_back = true;
}
NPU_to_MA = false;
release_packets(channel_id, flow_id, message_size);
(*non_zero_latency_packets)[channel_id]--;
NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*non_zero_latency_packets)[channel_id] : %d ",id,(*non_zero_latency_packets)[channel_id]);
return;
}
Sys::sys_panic("should not inject nothing!");
}