astra-sim-alibabacloud/astra-sim/network_frontend/ns3/entry.h (357 lines of code) (raw):

/* *Copyright (c) 2024, Alibaba Group; *Licensed under the Apache License, Version 2.0 (the "License"); *you may not use this file except in compliance with the License. *You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 *Unless required by applicable law or agreed to in writing, software *distributed under the License is distributed on an "AS IS" BASIS, *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *See the License for the specific language governing permissions and *limitations under the License. */ #ifndef __ENTRY_H__ #define __ENTRY_H__ #undef PGO_TRAINING #define PATH_TO_PGO_CONFIG "path_to_pgo_config" #define _QPS_PER_CONNECTION_ 1 #include "common.h" #include "ns3/applications-module.h" #include "ns3/core-module.h" #include "ns3/error-model.h" #include "ns3/global-route-manager.h" #include "ns3/internet-module.h" #include "ns3/ipv4-static-routing-helper.h" #include "ns3/packet.h" #include "ns3/point-to-point-helper.h" #include "ns3/qbb-helper.h" #include <fstream> #include <iostream> #include <ns3/rdma-client-helper.h> #include <ns3/rdma-client.h> #include <ns3/rdma-driver.h> #include <ns3/rdma.h> #include <ns3/sim-setting.h> #include <ns3/switch-node.h> #include <time.h> #include <unordered_map> #include <mutex> #include <vector> #ifdef NS3_MTP #include "ns3/mtp-interface.h" #endif #include <map> #include"astra-sim/system/MockNcclQps.h" #include "astra-sim/system/MockNcclLog.h" using namespace ns3; using namespace std; std::map<std::pair<std::pair<int, int>,int>, AstraSim::ncclFlowTag> receiver_pending_queue; std::map<std::pair<int, std::pair<int, int>>, AstraSim::ncclFlowTag> sender_src_port_map; struct task1 { int src; int dest; int type; uint64_t count; void *fun_arg; void (*msg_handler)(void *fun_arg); double schTime; }; map<std::pair<int, std::pair<int, int>>, struct task1> expeRecvHash; map<std::pair<int, std::pair<int, int>>, int> recvHash; map<std::pair<int, std::pair<int, int>>, struct task1> sentHash; map<std::pair<int, int>, int64_t> nodeHash; map<std::pair<int,std::pair<int,int>>,int> waiting_to_sent_callback; map<std::pair<int,std::pair<int,int>>,int>waiting_to_notify_receiver; map<std::pair<int,std::pair<int,int>>,uint64_t>received_chunksize; map<std::pair<int,std::pair<int,int>>,uint64_t>sent_chunksize; bool is_sending_finished(int src,int dst,AstraSim::ncclFlowTag flowTag){ int tag_id = flowTag.current_flow_id; if (waiting_to_sent_callback.count( std::make_pair(tag_id, std::make_pair(src, dst)))) { if (--waiting_to_sent_callback[std::make_pair( tag_id, std::make_pair(src, dst))] == 0) { waiting_to_sent_callback.erase( std::make_pair(tag_id, std::make_pair(src, dst))); return true; } } return false; } bool is_receive_finished(int src,int dst,AstraSim::ncclFlowTag flowTag){ int tag_id = flowTag.current_flow_id; map<std::pair<int,std::pair<int,int>>,int>::iterator it; MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (waiting_to_notify_receiver.count( std::make_pair(tag_id, std::make_pair(src, dst)))) { NcclLog->writeLog(NcclLogLevel::DEBUG," is_receive_finished waiting_to_notify_receiver tag_id %d src %d dst %d count %d",tag_id,src,dst,waiting_to_notify_receiver[std::make_pair( tag_id, std::make_pair(src, dst))]); if (--waiting_to_notify_receiver[std::make_pair( tag_id, std::make_pair(src, dst))] == 0) { waiting_to_notify_receiver.erase( std::make_pair(tag_id, std::make_pair(src, dst))); return true; } } return false; } void SendFlow(int src, int dst, uint64_t maxPacketCount, void (*msg_handler)(void *fun_arg), void *fun_arg, int tag, AstraSim::sim_request *request) { MockNcclLog*NcclLog = MockNcclLog::getInstance(); uint64_t PacketCount=((maxPacketCount+_QPS_PER_CONNECTION_-1)/_QPS_PER_CONNECTION_); uint64_t leftPacketCount = maxPacketCount; for(int index = 0 ;index<_QPS_PER_CONNECTION_;index++){ uint64_t real_PacketCount = min(PacketCount,leftPacketCount); leftPacketCount-=real_PacketCount; uint32_t port = portNumber[src][dst]++; { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif sender_src_port_map[make_pair(port, make_pair(src, dst))] = request->flowTag; #ifdef NS3_MTP cs.ExitSection(); #endif } int flow_id = request->flowTag.current_flow_id; bool nvls_on = request->flowTag.nvls_on; int pg = 3, dport = 100; int send_lat = 6000; const char* send_lat_env = std::getenv("AS_SEND_LAT"); if (send_lat_env) { try { send_lat = std::stoi(send_lat_env); } catch (const std::invalid_argument& e) { NcclLog->writeLog(NcclLogLevel::ERROR,"send_lat set error"); exit(-1); } } send_lat *= 1000; flow_input.idx++; if(real_PacketCount == 0) real_PacketCount = 1; MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG," 发包事件 %dSendFlow to %d channelid: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",src,dst,tag,flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick()); NcclLog->writeLog(NcclLogLevel::DEBUG," request->flowTag 发包事件 %dSendFlow to %d tag_id: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",request->flowTag.sender_node,request->flowTag.receiver_node,request->flowTag.tag_id,request->flowTag.current_flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick()); RdmaClientHelper clientHelper( pg, serverAddress[src], serverAddress[dst], port, dport, real_PacketCount, has_win ? (global_t == 1 ? maxBdp : pairBdp[n.Get(src)][n.Get(dst)]) : 0, global_t == 1 ? maxRtt : pairRtt[src][dst], msg_handler, fun_arg, tag, src, dst); if(nvls_on) clientHelper.SetAttribute("NVLS_enable", UintegerValue (1)); { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif ApplicationContainer appCon = clientHelper.Install(n.Get(src)); appCon.Start(Time(send_lat)); waiting_to_sent_callback[std::make_pair(request->flowTag.current_flow_id,std::make_pair(src,dst))]++; waiting_to_notify_receiver[std::make_pair(request->flowTag.current_flow_id,std::make_pair(src,dst))]++; #ifdef NS3_MTP cs.ExitSection(); #endif } NcclLog->writeLog(NcclLogLevel::DEBUG,"waiting_to_notify_receiver current_flow_id %d src %d dst %d count %d",request->flowTag.current_flow_id,src,dst,waiting_to_notify_receiver[std::make_pair(request->flowTag.tag_id,std::make_pair(src,dst))]); } } void notify_receiver_receive_data(int sender_node, int receiver_node, uint64_t message_size, AstraSim::ncclFlowTag flowTag) { { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d",sender_node,receiver_node,message_size); int tag = flowTag.tag_id; if (expeRecvHash.find(make_pair( tag, make_pair(sender_node, receiver_node))) != expeRecvHash.end()) { task1 t2 = expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))]; MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d t2.count: %d channle id: %d",sender_node,receiver_node,message_size,t2.count,flowTag.channel_id); AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t2.fun_arg; if (message_size == t2.count) { NcclLog->writeLog(NcclLogLevel::DEBUG," message_size = t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag); expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node))); #ifdef NS3_MTP cs.ExitSection(); #endif assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1); ehd->flowTag = flowTag; t2.msg_handler(t2.fun_arg); goto receiver_end_1st_section; } else if (message_size > t2.count) { recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] = message_size - t2.count; NcclLog->writeLog(NcclLogLevel::DEBUG,"message_size > t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag); expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node))); #ifdef NS3_MTP cs.ExitSection(); #endif assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1); ehd->flowTag = flowTag; t2.msg_handler(t2.fun_arg); goto receiver_end_1st_section; } else { t2.count -= message_size; expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))] = t2; } } else { receiver_pending_queue[std::make_pair(std::make_pair(receiver_node, sender_node),tag)] = flowTag; if (recvHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) == recvHash.end()) { recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] = message_size; } else { recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] += message_size; } } #ifdef NS3_MTP cs.ExitSection(); #endif receiver_end_1st_section: { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs2; #endif if (nodeHash.find(make_pair(receiver_node, 1)) == nodeHash.end()) { nodeHash[make_pair(receiver_node, 1)] = message_size; } else { nodeHash[make_pair(receiver_node, 1)] += message_size; } #ifdef NS3_MTP cs2.ExitSection(); #endif } } } void notify_sender_sending_finished(int sender_node, int receiver_node, uint64_t message_size, AstraSim::ncclFlowTag flowTag) { { MockNcclLog * NcclLog = MockNcclLog::getInstance(); #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif int tag = flowTag.tag_id; if (sentHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) != sentHash.end()) { task1 t2 = sentHash[make_pair(tag, make_pair(sender_node, receiver_node))]; AstraSim::SendPacketEventHandlerData* ehd = (AstraSim::SendPacketEventHandlerData*) t2.fun_arg; ehd->flowTag=flowTag; if (t2.count == message_size) { sentHash.erase(make_pair(tag, make_pair(sender_node, receiver_node))); if (nodeHash.find(make_pair(sender_node, 0)) == nodeHash.end()) { nodeHash[make_pair(sender_node, 0)] = message_size; } else { nodeHash[make_pair(sender_node, 0)] += message_size; } #ifdef NS3_MTP cs.ExitSection(); #endif t2.msg_handler(t2.fun_arg); goto sender_end_1st_section; }else{ NcclLog->writeLog(NcclLogLevel::ERROR,"sentHash msg size != sender_node %d receiver_node %d message_size %lu flow_id ",sender_node,receiver_node,message_size); } }else{ NcclLog->writeLog(NcclLogLevel::ERROR,"sentHash cann't find sender_node %d receiver_node %d message_size %lu",sender_node,receiver_node,message_size); } #ifdef NS3_MTP cs.ExitSection(); #endif } sender_end_1st_section: return; } void notify_sender_packet_arrivered_receiver(int sender_node, int receiver_node, uint64_t message_size, AstraSim::ncclFlowTag flowTag) { int tag = flowTag.channel_id; if (sentHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) != sentHash.end()) { task1 t2 = sentHash[make_pair(tag, make_pair(sender_node, receiver_node))]; AstraSim::SendPacketEventHandlerData* ehd = (AstraSim::SendPacketEventHandlerData*) t2.fun_arg; ehd->flowTag=flowTag; if (t2.count == message_size) { sentHash.erase(make_pair(tag, make_pair(sender_node, receiver_node))); if (nodeHash.find(make_pair(sender_node, 0)) == nodeHash.end()) { nodeHash[make_pair(sender_node, 0)] = message_size; } else { nodeHash[make_pair(sender_node, 0)] += message_size; } t2.msg_handler(t2.fun_arg); } } } void qp_finish(FILE *fout, Ptr<RdmaQueuePair> q) { uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip); uint64_t base_rtt = pairRtt[sid][did], b = pairBw[sid][did]; uint32_t total_bytes = q->m_size + ((q->m_size - 1) / packet_payload_size + 1) * (CustomHeader::GetStaticWholeHeaderSize() - IntHeader::GetStaticSize()); uint64_t standalone_fct = base_rtt + total_bytes * 8000000000lu / b; fprintf(fout, "%08x %08x %u %u %lu %lu %lu %lu\n", q->sip.Get(), q->dip.Get(), q->sport, q->dport, q->m_size, q->startTime.GetTimeStep(), (Simulator::Now() - q->startTime).GetTimeStep(), standalone_fct); fflush(fout); AstraSim::ncclFlowTag flowTag; uint64_t notify_size; { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif Ptr<Node> dstNode = n.Get(did); Ptr<RdmaDriver> rdma = dstNode->GetObject<RdmaDriver>(); rdma->m_rdma->DeleteRxQp(q->sip.Get(), q->m_pg, q->sport); MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG,"qp finish, src: %d did: %d port: %d total bytes: %d at the tick: %d",sid,did,q->sport,q->m_size,AstraSim::Sys::boostedTick()); if (sender_src_port_map.find(make_pair(q->sport, make_pair(sid, did))) == sender_src_port_map.end()) { NcclLog->writeLog(NcclLogLevel::ERROR,"could not find the tag, there must be something wrong"); exit(-1); } flowTag = sender_src_port_map[make_pair(q->sport, make_pair(sid, did))]; sender_src_port_map.erase(make_pair(q->sport, make_pair(sid, did))); received_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]+=q->m_size; if(!is_receive_finished(sid,did,flowTag)) { #ifdef NS3_MTP cs.ExitSection(); #endif return; } notify_size = received_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]; received_chunksize.erase(std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))); #ifdef NS3_MTP cs.ExitSection(); #endif } notify_receiver_receive_data(sid, did, notify_size, flowTag); } void send_finish(FILE *fout, Ptr<RdmaQueuePair> q) { uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip); AstraSim::ncclFlowTag flowTag; MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG,"数据包出发送网卡 send finish, src: %d did: %d port: %d srcip %d dstip %d total bytes: %d at the tick: %d",sid,did,q->sport,q->sip,q->dip,q->m_size,AstraSim::Sys::boostedTick()); int all_sent_chunksize; { #ifdef NS3_MTP MtpInterface::explicitCriticalSection cs; #endif flowTag = sender_src_port_map[make_pair(q->sport, make_pair(sid, did))]; sent_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]+=q->m_size; if(!is_sending_finished(sid,did,flowTag)) { #ifdef NS3_MTP cs.ExitSection(); #endif return; } all_sent_chunksize = sent_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]; sent_chunksize.erase(std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))); #ifdef NS3_MTP cs.ExitSection(); #endif } notify_sender_sending_finished(sid, did, all_sent_chunksize, flowTag); } int main1(string network_topo,string network_conf) { clock_t begint, endt; begint = clock(); if (!ReadConf(network_topo,network_conf)) return -1; SetConfig(); SetupNetwork(qp_finish,send_finish); std::cout << "Running Simulation.\n"; fflush(stdout); NS_LOG_INFO("Run Simulation."); endt = clock(); return 0; } #endif