astra-sim-alibabacloud/astra-sim/system/collective/NcclTreeFlowModel.cc (714 lines of code) (raw):

/* *Copyright (c) 2024, Alibaba Group; *Licensed under the Apache License, Version 2.0 (the "License"); *you may not use this file except in compliance with the License. *You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 *Unless required by applicable law or agreed to in writing, software *distributed under the License is distributed on an "AS IS" BASIS, *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *See the License for the specific language governing permissions and *limitations under the License. */ #ifdef PHY_MTP #include<mpi.h> #include "astra-sim/system/PhyMultiThread.hh" #endif #include<chrono> #include "NcclTreeFlowModel.hh" #include "astra-sim/system/PacketBundle.hh" #include "astra-sim/system/RecvPacketEventHadndlerData.hh" #include "astra-sim/system/MockNcclLog.h" #ifdef PHY_RDMA #include "astra-sim/system/SimAiFlowModelRdma.hh" extern FlowPhyRdma flow_rdma; #endif namespace AstraSim { std::atomic<bool> NcclTreeFlowModel::g_flow_inCriticalSection(false); NcclTreeFlowModel::NcclTreeFlowModel( ComType type, int id, int layer_num, RingTopology* ring_topology, uint64_t data_size, RingTopology::Direction direction, InjectionPolicy injection_policy, bool boost_mode, std::shared_ptr<MockNccl::FlowModels> ptr_flow_models, int treechannels) : Algorithm(layer_num){ this->start_time = std::chrono::high_resolution_clock::now(); this->end_time = std::chrono::high_resolution_clock::now(); this->comType = type; this->id = id; this->logicalTopology = ring_topology; this->data_size = data_size; this->nodes_in_ring = ring_topology->get_nodes_in_ring(); this->parallel_reduce = 1; this->toggle = false; this->name = Name::Ring; this->enabled = true; this->m_channels = treechannels; this->judge_exit_flag.store(false); this->judge_exit_mutex.unlock(); this->judge_mutex.unlock(); this->send_packets = 0; this->recv_packets = 0; pQps = new MockNccl::NcclQps(); zero_latency_packets = new std::map<int, int>(); non_zero_latency_packets = new std::map<int, int>(); if (boost_mode) { this->enabled = ring_topology->is_enabled(); } if (ring_topology->dimension == RingTopology::Dimension::Local) { transmition = MemBus::Transmition::Fast; } else { transmition = MemBus::Transmition::Usual; } if(ptr_flow_models){ if(id == 0) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); } for(auto f : *ptr_flow_models) { if(f.second.dest == id) { this->free_packets[std::make_pair(f.second.channel_id,f.second.src)]++; this->_flow_models[f.first] = f.second; recv_packets++; } if(f.second.src == id) { if(pQps->peer_qps.count(std::make_pair(f.second.channel_id,std::make_pair(f.second.src,f.second.dest)))==0){ pQps->peer_qps[std::make_pair(f.second.channel_id,std::make_pair(f.second.src,f.second.dest))]=1; } NcclTreeFlowModel::FlowCriticalSection cs; this->_stream_count[f.second.channel_id] += 1; cs.ExitSection(); assert(this->_flow_models.count(f.first) == 0); this->_flow_models[f.first] = f.second; send_packets++; } } } for(int channel_id = 0 ;channel_id<m_channels;channel_id++){ assert(zero_latency_packets->find(channel_id) == zero_latency_packets->end()); (*zero_latency_packets)[channel_id] = 0; assert(non_zero_latency_packets->find(channel_id) == non_zero_latency_packets->end()); (*non_zero_latency_packets)[channel_id] = 0; } init_indegree_mapping(); switch (type) { case ComType::All_Reduce: this->final_data_size = data_size; break; case ComType::All_Gather: this->final_data_size = data_size * nodes_in_ring; break; case ComType::Reduce_Scatter: this->final_data_size = data_size / nodes_in_ring; break; case ComType::All_to_All: this->final_data_size = data_size; break; default:; } } void NcclTreeFlowModel::init_indegree_mapping(){ MockNccl::FlowModels::iterator tree_it; for(tree_it = _flow_models.begin();tree_it != _flow_models.end();tree_it++) { if(tree_it->second.src!=id) continue; indegree_mapping[tree_it->first.second] = tree_it->second.parent_flow_id.size(); } } int NcclTreeFlowModel::get_non_zero_latency_packets() { return (nodes_in_ring - 1) * parallel_reduce * 1; } void NcclTreeFlowModel::run(EventType event, CallData* data) { BasicEventHandlerData* ehd = (BasicEventHandlerData*)data; MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (event == EventType::General) { int channel_id = ehd->channel_id; int flow_id = ehd->flow_id; #ifndef PHY_MTP ready(channel_id, flow_id); #else phy_ready(channel_id, flow_id); #endif } else if (event == EventType::PacketReceived) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); RecvPacketEventHadndlerData* rcehd = (RecvPacketEventHadndlerData*)ehd; AstraSim::ncclFlowTag flowTag = rcehd->flowTag; int received_flow_id = flowTag.current_flow_id; int channel_id = flowTag.channel_id; std::vector<int> next_flow_list = flowTag.tree_flow_list; #ifdef PHY_MTP recv_packets--; if(!phy_iteratable(channel_id)){ return; } #else bool flow_exist = next_flow_list.size() == 0 ? true : false; for(int i = 0; i < next_flow_list.size(); ++ i) { int next_flow_id = next_flow_list[i]; if(next_flow_id == -1 || _flow_models.count(std::make_pair(channel_id, next_flow_id)) != 0) flow_exist = true; else { flow_exist = false; break; } } assert(flow_exist == true); NcclTreeFlowModel::FlowCriticalSection cs; free_packets[std::make_pair(channel_id, flowTag.sender_node)]--; bool tag = true; for (int i = 0; i < m_channels; i++) { if (_stream_count[i] != 0) { tag = false; break; } } cs.ExitSection(); if(tag) { ready(channel_id, -1); iteratable(channel_id); return; } #endif NcclLog->writeLog(NcclLogLevel::DEBUG,"PacketReceived sender_node: %d recevier %d current_flow id: %d channel_id: %d tag_id %d free_packets %d next_flow_list.size %d",flowTag.sender_node,flowTag.receiver_node,flowTag.current_flow_id,flowTag.channel_id,flowTag.tag_id,free_packets[std::make_pair(channel_id,flowTag.sender_node)],next_flow_list.size()); #ifdef PHY_MTP for (int next_flow_id : next_flow_list){ if (--indegree_mapping[next_flow_id] == 0) { phy_ready(channel_id, next_flow_id); } } #else flow_exist = true; bool flow_send = false; bool recv_finished_tag = true; for (auto it = free_packets.begin(); it != free_packets.end(); it++) { if (it->second != 0) { recv_finished_tag = false; break; } } NcclLog->writeLog(NcclLogLevel::DEBUG,"next_flow_list.size %d",next_flow_list.size()); for (int next_flow_id : next_flow_list) { NcclTreeFlowModel::FlowCriticalSection cs; if (indegree_mapping.count(next_flow_id) == 0) { flow_exist = false; cs.ExitSection(); break; } if (--indegree_mapping[next_flow_id] == 0) { MockNccl::SingleFlow cur_flow = _flow_models[std::make_pair(channel_id, next_flow_id)]; cs.ExitSection(); insert_packets(channel_id, next_flow_id); }else{ cs.ExitSection(); } } assert(flow_exist = true); #endif } else if (event == EventType::StreamInit) { #ifdef PHY_MTP MPI_Barrier(MPI_COMM_WORLD); for(auto single_flow: _flow_models){ if((single_flow.second.src==id||single_flow.second.dest==id)){ #ifdef PHY_RDMA flow_rdma.ibv_create_peer_qp(id,single_flow.second.channel_id,single_flow.second.src,single_flow.second.dest,single_flow.second.chunk_count + 1 ,single_flow.second.chunk_id,single_flow.second.flow_size); #endif } } MPI_Barrier(MPI_COMM_WORLD); auto now = std::chrono::system_clock::now(); auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count(); NcclLog->writeLog(NcclLogLevel::DEBUG,"streamInit time %lld",now_us); start_time = std::chrono::high_resolution_clock::now(); #endif for (int i = 0; i < parallel_reduce; i++) { #ifndef PHY_MTP init_recv_ready(); #endif for(int j = 0; j < m_channels; j ++) { for(const auto flow_model : _flow_models) { if(flow_model.second.src!=id)continue; std::vector<int> parent_list = flow_model.second.parent_flow_id; if((parent_list.size() == 0 ) && flow_model.second.channel_id == j ) { #ifdef PHY_MTP if(flow_model.second.chunk_id == 0){ phy_ready(j, flow_model.second.flow_id); } #else if (flow_model.second.chunk_id == 0) { pQps->peer_qps[std::make_pair( flow_model.second.channel_id, std::make_pair( flow_model.second.src, flow_model.second.dest))] = 0; insert_packets(j,flow_model.second.flow_id); } else { pQps->peer_wating_tasks[std::make_pair( flow_model.second.channel_id, std::make_pair( flow_model.second.src, flow_model.second.dest))] .push(flow_model.second.flow_id); } #endif } } } #ifdef PHY_MTP waiting_to_exit(); NcclLog->writeLog(NcclLogLevel::DEBUG, "NcclTreeFlowModel::waiting_to_exit end "); #endif } } else if(event == EventType::PacketSentFinshed){ SendPacketEventHandlerData* rcehd = (SendPacketEventHandlerData*)ehd; AstraSim::ncclFlowTag flowTag = rcehd->flowTag; int sent_flow_id = flowTag.current_flow_id; int channel_id = flowTag.channel_id; std::vector<int> next_flow_list = flowTag.tree_flow_list; NcclLog->writeLog(NcclLogLevel::DEBUG,"PacketSentFinshed src %d dst %d channel_id %d flow_id %d",flowTag.sender_node,flowTag.receiver_node,flowTag.channel_id,flowTag.current_flow_id); reduce(channel_id,sent_flow_id); bool flow_exist = next_flow_list.size() == 0 ? true : false; #ifndef PHY_MTP NcclTreeFlowModel::FlowCriticalSection cs; pQps->peer_qps[std::make_pair(flowTag.channel_id,std::make_pair(flowTag.sender_node,flowTag.receiver_node))]=1; cs.ExitSection(); if(pQps->peer_wating_tasks[std::make_pair(flowTag.channel_id,std::make_pair(flowTag.sender_node,flowTag.receiver_node))].size()>0){ int cur_flow_id = pQps->peer_wating_tasks[std::make_pair(flowTag.channel_id,std::make_pair(flowTag.sender_node,flowTag.receiver_node))].front(); pQps->peer_wating_tasks[std::make_pair(flowTag.channel_id,std::make_pair(flowTag.sender_node,flowTag.receiver_node))].pop(); pQps->peer_qps[std::make_pair(flowTag.channel_id,std::make_pair(flowTag.sender_node,flowTag.receiver_node))]=0; insert_packets(channel_id,cur_flow_id); } iteratable(channel_id); #else phy_iteratable(channel_id); #endif } } bool NcclTreeFlowModel::init_recv_ready() { std::map<std::pair<int,std::vector<int>>,std::vector<int>> recv_ready_flows; for(auto flow : _flow_models){ if(flow.second.src!=id) continue; if(flow.second.chunk_id!=0)continue; if (flow.second.parent_flow_id.size() == 0) continue; std::pair<int, std::vector<int>> cur = std::make_pair(flow.second.channel_id, flow.second.prev); if (recv_ready_flows.count(cur) == 0) { recv_ready_flows[cur] = {flow.second.flow_id}; } else { std::vector<int> flow_ids = recv_ready_flows[cur]; bool flag = true; for (int flow_id : flow_ids) { MockNccl::SingleFlow old_flow = _flow_models[std::make_pair(flow.second.channel_id, flow_id)]; if (old_flow.parent_flow_id == flow.second.parent_flow_id) { flag = false; break; } } if (flag) { recv_ready_flows[cur].push_back(flow.second.flow_id); } } } std::map<std::pair<int,std::vector<int>>,std::vector<int>>::iterator recv_ready_flow_it; for(recv_ready_flow_it = recv_ready_flows.begin();recv_ready_flow_it!=recv_ready_flows.end();recv_ready_flow_it++){ for(int flow_id: recv_ready_flow_it->second){ recv_ready(recv_ready_flow_it->first.first,flow_id); } } return true; } bool NcclTreeFlowModel::recv_ready(int channel_id, int flow_id) { std::vector<int>recv_prevs; auto flow_model = _flow_models[std::make_pair(channel_id,flow_id)]; recv_prevs = flow_model.prev; MockNcclLog* NcclLog = MockNcclLog::getInstance(); for (int recv_prev : recv_prevs) { sim_request rcv_req; rcv_req.vnet = this->stream->current_queue_id; rcv_req.layerNum = layer_num; RecvPacketEventHadndlerData* ehd = new RecvPacketEventHadndlerData( stream, stream->owner->id, EventType::PacketReceived, recv_prev, 1); ehd->flowTag.child_flow_id = -1; ehd->flowTag.current_flow_id = -1; ehd->flowTag.channel_id = channel_id; ehd->flowTag.tag_id =layer_num*flow_model.chunk_count*m_channels+ flow_model.chunk_count*flow_model.channel_id; stream->owner->front_end_sim_recv( 0, Sys::dummy_data, _flow_models[std::make_pair(channel_id, flow_id)].flow_size, UINT8, recv_prev, channel_id, &rcv_req, &Sys::handleEvent, ehd); } return true; } void NcclTreeFlowModel::release_packets(int channel_id, int flow_id, uint64_t message_size) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (NPU_to_MA == true) { (new PacketBundle( stream->owner, stream, {}, processed, send_back, message_size, transmition, channel_id, flow_id)) ->send_to_MA(); } else { (new PacketBundle( stream->owner, stream, {}, processed, send_back, message_size, transmition, channel_id, flow_id)) ->send_to_NPU(); } NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d finish release_packets",id); } void NcclTreeFlowModel::process_stream_count(int channel_id) { MockNcclLog*NcclLog = MockNcclLog::getInstance(); #ifdef PHY_MTP send_packets--; #else NcclTreeFlowModel::FlowCriticalSection cs; if (_stream_count[channel_id] > 0) { _stream_count[channel_id]--; } NcclLog->writeLog(NcclLogLevel::DEBUG,"NcclTreeFlowModel::process_stream_count channel_id %d _stream_count %d",channel_id,_stream_count[channel_id]); if (_stream_count[channel_id] == 0 && stream->state != StreamState::Dead) stream->changeState(StreamState::Zombie); cs.ExitSection(); #endif } void NcclTreeFlowModel::reduce(int channel_id, int flow_id) { process_stream_count(channel_id); #ifndef PHY_MTP if(!packets[std::make_pair(channel_id, flow_id)].empty()){ packets[std::make_pair(channel_id, flow_id)].pop_front(); } #endif } bool NcclTreeFlowModel::iteratable(int channel_id) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); bool all_channel_finished = true, all_packets_freed = true; NcclTreeFlowModel::FlowCriticalSection cs; for(int i = 0; i < m_channels; ++ i) { if(_stream_count.count(i) != 0 && _stream_count[i] != 0) all_channel_finished = false; } for (auto it = free_packets.begin(); it != free_packets.end(); it++) { if (it->second != 0) { all_packets_freed = false; break; } } cs.ExitSection(); if (all_channel_finished == true && all_packets_freed == true) { exit(); return false; } return true; } void NcclTreeFlowModel::insert_packets(int channel_id, int flow_id) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); assert(channel_id < m_channels); if (!enabled) { return; } assert(_flow_models.count(std::make_pair(channel_id, flow_id)) != 0); MockNccl::SingleFlow f = _flow_models[std::make_pair(channel_id, flow_id)]; assert(zero_latency_packets->count(channel_id) != 0 && non_zero_latency_packets->count(channel_id) != 0); if ((*zero_latency_packets)[channel_id] == 0 && (*non_zero_latency_packets)[channel_id] == 0) { (*zero_latency_packets)[channel_id] = parallel_reduce * 1; (*non_zero_latency_packets)[channel_id] = get_non_zero_latency_packets(); toggle = !toggle; } int current_receiver = f.dest; std::vector<int> current_sender = f.prev; if ((*zero_latency_packets)[channel_id] > 0) { NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*zero_latency_packets)[channel_id] > 0",id); uint64_t message_size = f.flow_size; packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket( stream->current_queue_id, current_sender[0], current_receiver, message_size, channel_id, flow_id)); packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id); packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr; processed = false; send_back = false; NPU_to_MA = true; release_packets(channel_id, flow_id, message_size); (*zero_latency_packets)[channel_id]--; NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*zero_latency_packets)[channel_id] : %d ",id,(*zero_latency_packets)[channel_id]); return; } else if ((*non_zero_latency_packets)[channel_id] > 0) { NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*non_zero_latency_packets)[channel_id] > 0",id); uint64_t message_size = f.flow_size; packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket( stream->current_queue_id, current_sender[0], current_receiver, message_size, channel_id, flow_id)); packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id); packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr; if (comType == ComType::Reduce_Scatter || (comType == ComType::All_Reduce && toggle)) { processed = true; } else { processed = false; } if ((*non_zero_latency_packets)[channel_id] <= parallel_reduce * 1) { send_back = false; } else { send_back = true; } NPU_to_MA = false; release_packets(channel_id, flow_id, message_size); (*non_zero_latency_packets)[channel_id]--; NcclLog->writeLog(NcclLogLevel::DEBUG,"id: %d (*non_zero_latency_packets)[channel_id] : %d ",id,(*non_zero_latency_packets)[channel_id]); return; } Sys::sys_panic("should not inject nothing!"); } bool NcclTreeFlowModel::ready(int channel_id, int flow_id) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); MyPacket packet; { if (stream->state == StreamState::Created || stream->state == StreamState::Ready) { stream->changeState(StreamState::Executing); } if (!enabled || packets[std::make_pair(channel_id, flow_id)].size() == 0 || _stream_count[channel_id] == 0) { NcclLog->writeLog(NcclLogLevel::DEBUG,"NcclTreeFlowModel not ready!"); return false; } packet = packets[std::make_pair(channel_id, flow_id)].front(); } std::vector<int>recv_prevs; recv_prevs = _flow_models[std::make_pair(channel_id, flow_id)].prev; for (int recv_prev : recv_prevs) { sim_request rcv_req; rcv_req.vnet = this->stream->current_queue_id; rcv_req.layerNum = layer_num; rcv_req.reqCount = packet.msg_size; rcv_req.tag = channel_id; RecvPacketEventHadndlerData* ehd = new RecvPacketEventHadndlerData( stream, stream->owner->id, EventType::PacketReceived, packet.preferred_vnet, packet.stream_num); ehd->flowTag.child_flow_id = -1; ehd->flowTag.current_flow_id = -1; auto flow_model = this->_flow_models[std::make_pair(channel_id,flow_id)]; if(flow_model.parent_flow_id.size()==0 || flow_model.conn_type == "RING"){ ehd->flowTag.tag_id = layer_num*flow_model.chunk_count*m_channels + flow_model.chunk_count*flow_model.channel_id+flow_model.chunk_id; }else{ ehd->flowTag.tag_id = layer_num*flow_model.chunk_count*m_channels + flow_model.chunk_count*flow_model.channel_id+flow_model.chunk_id+1; } ehd->flowTag.channel_id = packet.channel_id; if (free_packets[std::make_pair(channel_id, recv_prev)] > 0) { stream->owner->front_end_sim_recv( 0, Sys::dummy_data, rcv_req.reqCount, UINT8, recv_prev, rcv_req.tag, &rcv_req, &Sys::handleEvent, ehd); } } sim_request snd_req; snd_req.srcRank = id; snd_req.dstRank = packet.preferred_dest; snd_req.tag = channel_id; snd_req.reqType = UINT8; snd_req.vnet = this->stream->current_queue_id; snd_req.layerNum = layer_num; snd_req.reqCount = packet.msg_size; MockNccl::SingleFlow flow_model = this->_flow_models[std::make_pair(channel_id, flow_id)]; snd_req.flowTag.tag_id = layer_num * flow_model.chunk_count * m_channels + flow_model.channel_id * flow_model.chunk_count + flow_model.chunk_id; snd_req.flowTag.channel_id = channel_id; snd_req.flowTag.flow_size = flow_model.flow_size; snd_req.flowTag.current_flow_id = flow_id; snd_req.flowTag.chunk_id = flow_model.chunk_id; snd_req.flowTag.child_flow_id = -1; snd_req.flowTag.tree_flow_list = this->_flow_models[std::make_pair(channel_id, flow_id)].child_flow_id; snd_req.flowTag.sender_node = id; snd_req.flowTag.receiver_node = packet.preferred_dest; snd_req.flowTag.pQps = this->pQps; if (this->comType == ComType::All_Reduce_NVLS) snd_req.flowTag.nvls_on = true; else snd_req.flowTag.nvls_on = false; SendPacketEventHandlerData* send_ehd = new SendPacketEventHandlerData( stream, id, packet.preferred_dest, channel_id, EventType::PacketSentFinshed); stream->owner->front_end_sim_send( 0, Sys::dummy_data, snd_req.reqCount, UINT8, packet.preferred_dest, snd_req.flowTag.tag_id, &snd_req, &Sys::handleEvent, send_ehd); return true; } void NcclTreeFlowModel::exit() { MockNcclLog* NcclLog = MockNcclLog::getInstance(); #ifdef PHY_MTP auto now = std::chrono::system_clock::now(); auto now_us = std::chrono::duration_cast<std::chrono::microseconds>( now.time_since_epoch()) .count(); NcclLog->writeLog( NcclLogLevel::DEBUG, "NcclTreeFlowModel exit time %lld", now_us); end_time = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time); NcclLog->writeLog(NcclLogLevel::DEBUG,"Communication Latency:%lld us",duration.count()); MPI_Barrier(MPI_COMM_WORLD); sleep(1); #else for(std::pair<std::pair<int, int>, std::list<MyPacket>> packet: packets) { if(packet.second.size() != 0) packet.second.clear(); } #endif stream->owner->proceed_to_next_vnet_baseline((StreamBaseline*)stream); NcclLog->writeLog(NcclLogLevel::DEBUG,"NcclTreeFlowModel exit"); return; } #ifdef PHY_RDMA bool NcclTreeFlowModel::phy_iteratable(int channel_id){ MockNcclLog* NcclLog = MockNcclLog::getInstance(); bool all_send_finished = true, all_recv_finished = true; bool exit_flag = true; if(send_packets!=0||recv_packets!=0){ exit_flag=false; } if(exit_flag){ judge_exit_flag.store(true); return false; } else{ return true; } } bool NcclTreeFlowModel::phy_ready(int channel_id,int flow_id) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (stream->state == StreamState::Created || stream->state == StreamState::Ready) { stream->changeState(StreamState::Executing); } MockNccl::SingleFlow flow = _flow_models[std::make_pair(channel_id, flow_id)]; std::vector<int>recv_prevs; recv_prevs = _flow_models[std::make_pair(channel_id, flow_id)].prev; for (int recv_prev : recv_prevs) { sim_request rcv_req; rcv_req.vnet = this->stream->current_queue_id; rcv_req.layerNum = layer_num; rcv_req.reqCount = flow.flow_size; rcv_req.tag = channel_id; RecvPacketEventHadndlerData* ehd = new RecvPacketEventHadndlerData( stream, stream->owner->id, EventType::PacketReceived, stream->current_queue_id, 1); ehd->flowTag.child_flow_id = -1; ehd->flowTag.current_flow_id = -1; auto flow_model = this->_flow_models[std::make_pair(channel_id,flow_id)]; if(flow_model.parent_flow_id.size()==0 || flow_model.conn_type == "RING"){ ehd->flowTag.tag_id = layer_num*flow_model.chunk_count*m_channels + flow_model.chunk_count*flow_model.channel_id+flow_model.chunk_id; }else{ ehd->flowTag.tag_id = layer_num*flow_model.chunk_count*m_channels + flow_model.chunk_count*flow_model.channel_id+flow_model.chunk_id+1; } ehd->flowTag.channel_id = flow.channel_id; if (free_packets[std::make_pair(channel_id, recv_prev)] > 0) { stream->owner->front_end_sim_recv( 0, Sys::dummy_data, rcv_req.reqCount, UINT8, recv_prev, rcv_req.tag, &rcv_req, &Sys::handleEvent, ehd); } } sim_request snd_req; snd_req.srcRank = id; snd_req.dstRank = flow.dest; snd_req.tag = channel_id; snd_req.reqType = UINT8; snd_req.vnet = this->stream->current_queue_id; snd_req.layerNum = layer_num; snd_req.reqCount = flow.flow_size; MockNccl::SingleFlow flow_model = this->_flow_models[std::make_pair(channel_id, flow_id)]; snd_req.flowTag.tag_id = layer_num * flow_model.chunk_count * m_channels + flow_model.channel_id * flow_model.chunk_count + flow_model.chunk_id; snd_req.flowTag.channel_id = channel_id; snd_req.flowTag.flow_size = flow_model.flow_size; snd_req.flowTag.current_flow_id = flow_id; snd_req.flowTag.chunk_id = flow_model.chunk_id; snd_req.flowTag.child_flow_id = -1; snd_req.flowTag.tree_flow_list = this->_flow_models[std::make_pair(channel_id, flow_id)].child_flow_id; snd_req.flowTag.sender_node = id; snd_req.flowTag.receiver_node = flow.dest; snd_req.flowTag.pQps = this->pQps; if (this->comType == ComType::All_Reduce_NVLS) snd_req.flowTag.nvls_on = true; else snd_req.flowTag.nvls_on = false; SendPacketEventHandlerData* send_ehd = new SendPacketEventHandlerData( stream, id, flow.dest, channel_id, EventType::PacketSentFinshed); stream->owner->front_end_sim_send( 0, Sys::dummy_data, snd_req.reqCount, UINT8, flow.dest, snd_req.tag, &snd_req, &Sys::handleEvent, send_ehd); return true; } void NcclTreeFlowModel::waiting_to_exit() { MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog( NcclLogLevel::DEBUG, "NcclTreeFlowModel::waiting_to_exit begin "); while (!judge_exit_flag) { }; exit(); return; } #endif } // namespace AstraSim