astra-sim-alibabacloud/astra-sim/network_frontend/ns3/entry.h (357 lines of code) (raw):
/*
*Copyright (c) 2024, Alibaba Group;
*Licensed under the Apache License, Version 2.0 (the "License");
*you may not use this file except in compliance with the License.
*You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
*/
#ifndef __ENTRY_H__
#define __ENTRY_H__
#undef PGO_TRAINING
#define PATH_TO_PGO_CONFIG "path_to_pgo_config"
#define _QPS_PER_CONNECTION_ 1
#include "common.h"
#include "ns3/applications-module.h"
#include "ns3/core-module.h"
#include "ns3/error-model.h"
#include "ns3/global-route-manager.h"
#include "ns3/internet-module.h"
#include "ns3/ipv4-static-routing-helper.h"
#include "ns3/packet.h"
#include "ns3/point-to-point-helper.h"
#include "ns3/qbb-helper.h"
#include <fstream>
#include <iostream>
#include <ns3/rdma-client-helper.h>
#include <ns3/rdma-client.h>
#include <ns3/rdma-driver.h>
#include <ns3/rdma.h>
#include <ns3/sim-setting.h>
#include <ns3/switch-node.h>
#include <time.h>
#include <unordered_map>
#include <mutex>
#include <vector>
#ifdef NS3_MTP
#include "ns3/mtp-interface.h"
#endif
#include <map>
#include"astra-sim/system/MockNcclQps.h"
#include "astra-sim/system/MockNcclLog.h"
using namespace ns3;
using namespace std;
std::map<std::pair<std::pair<int, int>,int>, AstraSim::ncclFlowTag> receiver_pending_queue;
std::map<std::pair<int, std::pair<int, int>>, AstraSim::ncclFlowTag> sender_src_port_map;
struct task1 {
int src;
int dest;
int type;
uint64_t count;
void *fun_arg;
void (*msg_handler)(void *fun_arg);
double schTime;
};
map<std::pair<int, std::pair<int, int>>, struct task1> expeRecvHash;
map<std::pair<int, std::pair<int, int>>, int> recvHash;
map<std::pair<int, std::pair<int, int>>, struct task1> sentHash;
map<std::pair<int, int>, int64_t> nodeHash;
map<std::pair<int,std::pair<int,int>>,int> waiting_to_sent_callback;
map<std::pair<int,std::pair<int,int>>,int>waiting_to_notify_receiver;
map<std::pair<int,std::pair<int,int>>,uint64_t>received_chunksize;
map<std::pair<int,std::pair<int,int>>,uint64_t>sent_chunksize;
bool is_sending_finished(int src,int dst,AstraSim::ncclFlowTag flowTag){
int tag_id = flowTag.current_flow_id;
if (waiting_to_sent_callback.count(
std::make_pair(tag_id, std::make_pair(src, dst)))) {
if (--waiting_to_sent_callback[std::make_pair(
tag_id, std::make_pair(src, dst))] == 0) {
waiting_to_sent_callback.erase(
std::make_pair(tag_id, std::make_pair(src, dst)));
return true;
}
}
return false;
}
bool is_receive_finished(int src,int dst,AstraSim::ncclFlowTag flowTag){
int tag_id = flowTag.current_flow_id;
map<std::pair<int,std::pair<int,int>>,int>::iterator it;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if (waiting_to_notify_receiver.count(
std::make_pair(tag_id, std::make_pair(src, dst)))) {
NcclLog->writeLog(NcclLogLevel::DEBUG," is_receive_finished waiting_to_notify_receiver tag_id %d src %d dst %d count %d",tag_id,src,dst,waiting_to_notify_receiver[std::make_pair(
tag_id, std::make_pair(src, dst))]);
if (--waiting_to_notify_receiver[std::make_pair(
tag_id, std::make_pair(src, dst))] == 0) {
waiting_to_notify_receiver.erase(
std::make_pair(tag_id, std::make_pair(src, dst)));
return true;
}
}
return false;
}
void SendFlow(int src, int dst, uint64_t maxPacketCount,
void (*msg_handler)(void *fun_arg), void *fun_arg, int tag, AstraSim::sim_request *request) {
MockNcclLog*NcclLog = MockNcclLog::getInstance();
uint64_t PacketCount=((maxPacketCount+_QPS_PER_CONNECTION_-1)/_QPS_PER_CONNECTION_);
uint64_t leftPacketCount = maxPacketCount;
for(int index = 0 ;index<_QPS_PER_CONNECTION_;index++){
uint64_t real_PacketCount = min(PacketCount,leftPacketCount);
leftPacketCount-=real_PacketCount;
uint32_t port = portNumber[src][dst]++;
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
sender_src_port_map[make_pair(port, make_pair(src, dst))] = request->flowTag;
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
int flow_id = request->flowTag.current_flow_id;
bool nvls_on = request->flowTag.nvls_on;
int pg = 3, dport = 100;
int send_lat = 6000;
const char* send_lat_env = std::getenv("AS_SEND_LAT");
if (send_lat_env) {
try {
send_lat = std::stoi(send_lat_env);
} catch (const std::invalid_argument& e) {
NcclLog->writeLog(NcclLogLevel::ERROR,"send_lat set error");
exit(-1);
}
}
send_lat *= 1000;
flow_input.idx++;
if(real_PacketCount == 0) real_PacketCount = 1;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," 发包事件 %dSendFlow to %d channelid: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",src,dst,tag,flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
NcclLog->writeLog(NcclLogLevel::DEBUG," request->flowTag 发包事件 %dSendFlow to %d tag_id: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",request->flowTag.sender_node,request->flowTag.receiver_node,request->flowTag.tag_id,request->flowTag.current_flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
RdmaClientHelper clientHelper(
pg, serverAddress[src], serverAddress[dst], port, dport, real_PacketCount,
has_win ? (global_t == 1 ? maxBdp : pairBdp[n.Get(src)][n.Get(dst)]) : 0,
global_t == 1 ? maxRtt : pairRtt[src][dst], msg_handler, fun_arg, tag,
src, dst);
if(nvls_on) clientHelper.SetAttribute("NVLS_enable", UintegerValue (1));
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
ApplicationContainer appCon = clientHelper.Install(n.Get(src));
appCon.Start(Time(send_lat));
waiting_to_sent_callback[std::make_pair(request->flowTag.current_flow_id,std::make_pair(src,dst))]++;
waiting_to_notify_receiver[std::make_pair(request->flowTag.current_flow_id,std::make_pair(src,dst))]++;
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"waiting_to_notify_receiver current_flow_id %d src %d dst %d count %d",request->flowTag.current_flow_id,src,dst,waiting_to_notify_receiver[std::make_pair(request->flowTag.tag_id,std::make_pair(src,dst))]);
}
}
void notify_receiver_receive_data(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d",sender_node,receiver_node,message_size);
int tag = flowTag.tag_id;
if (expeRecvHash.find(make_pair(
tag, make_pair(sender_node, receiver_node))) != expeRecvHash.end()) {
task1 t2 =
expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))];
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d t2.count: %d channle id: %d",sender_node,receiver_node,message_size,t2.count,flowTag.channel_id);
AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t2.fun_arg;
if (message_size == t2.count) {
NcclLog->writeLog(NcclLogLevel::DEBUG," message_size = t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1);
ehd->flowTag = flowTag;
t2.msg_handler(t2.fun_arg);
goto receiver_end_1st_section;
} else if (message_size > t2.count) {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] =
message_size - t2.count;
NcclLog->writeLog(NcclLogLevel::DEBUG,"message_size > t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1);
ehd->flowTag = flowTag;
t2.msg_handler(t2.fun_arg);
goto receiver_end_1st_section;
} else {
t2.count -= message_size;
expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))] = t2;
}
} else {
receiver_pending_queue[std::make_pair(std::make_pair(receiver_node, sender_node),tag)] = flowTag;
if (recvHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) ==
recvHash.end()) {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] =
message_size;
} else {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] +=
message_size;
}
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
receiver_end_1st_section:
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs2;
#endif
if (nodeHash.find(make_pair(receiver_node, 1)) == nodeHash.end()) {
nodeHash[make_pair(receiver_node, 1)] = message_size;
} else {
nodeHash[make_pair(receiver_node, 1)] += message_size;
}
#ifdef NS3_MTP
cs2.ExitSection();
#endif
}
}
}
void notify_sender_sending_finished(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
{
MockNcclLog * NcclLog = MockNcclLog::getInstance();
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
int tag = flowTag.tag_id;
if (sentHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) !=
sentHash.end()) {
task1 t2 = sentHash[make_pair(tag, make_pair(sender_node, receiver_node))];
AstraSim::SendPacketEventHandlerData* ehd = (AstraSim::SendPacketEventHandlerData*) t2.fun_arg;
ehd->flowTag=flowTag;
if (t2.count == message_size) {
sentHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
if (nodeHash.find(make_pair(sender_node, 0)) == nodeHash.end()) {
nodeHash[make_pair(sender_node, 0)] = message_size;
} else {
nodeHash[make_pair(sender_node, 0)] += message_size;
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
t2.msg_handler(t2.fun_arg);
goto sender_end_1st_section;
}else{
NcclLog->writeLog(NcclLogLevel::ERROR,"sentHash msg size != sender_node %d receiver_node %d message_size %lu flow_id ",sender_node,receiver_node,message_size);
}
}else{
NcclLog->writeLog(NcclLogLevel::ERROR,"sentHash cann't find sender_node %d receiver_node %d message_size %lu",sender_node,receiver_node,message_size);
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
sender_end_1st_section:
return;
}
void notify_sender_packet_arrivered_receiver(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
int tag = flowTag.channel_id;
if (sentHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) !=
sentHash.end()) {
task1 t2 = sentHash[make_pair(tag, make_pair(sender_node, receiver_node))];
AstraSim::SendPacketEventHandlerData* ehd = (AstraSim::SendPacketEventHandlerData*) t2.fun_arg;
ehd->flowTag=flowTag;
if (t2.count == message_size) {
sentHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
if (nodeHash.find(make_pair(sender_node, 0)) == nodeHash.end()) {
nodeHash[make_pair(sender_node, 0)] = message_size;
} else {
nodeHash[make_pair(sender_node, 0)] += message_size;
}
t2.msg_handler(t2.fun_arg);
}
}
}
void qp_finish(FILE *fout, Ptr<RdmaQueuePair> q) {
uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip);
uint64_t base_rtt = pairRtt[sid][did], b = pairBw[sid][did];
uint32_t total_bytes =
q->m_size +
((q->m_size - 1) / packet_payload_size + 1) *
(CustomHeader::GetStaticWholeHeaderSize() -
IntHeader::GetStaticSize());
uint64_t standalone_fct = base_rtt + total_bytes * 8000000000lu / b;
fprintf(fout, "%08x %08x %u %u %lu %lu %lu %lu\n", q->sip.Get(), q->dip.Get(),
q->sport, q->dport, q->m_size, q->startTime.GetTimeStep(),
(Simulator::Now() - q->startTime).GetTimeStep(), standalone_fct);
fflush(fout);
AstraSim::ncclFlowTag flowTag;
uint64_t notify_size;
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
Ptr<Node> dstNode = n.Get(did);
Ptr<RdmaDriver> rdma = dstNode->GetObject<RdmaDriver>();
rdma->m_rdma->DeleteRxQp(q->sip.Get(), q->m_pg, q->sport);
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG,"qp finish, src: %d did: %d port: %d total bytes: %d at the tick: %d",sid,did,q->sport,q->m_size,AstraSim::Sys::boostedTick());
if (sender_src_port_map.find(make_pair(q->sport, make_pair(sid, did))) ==
sender_src_port_map.end()) {
NcclLog->writeLog(NcclLogLevel::ERROR,"could not find the tag, there must be something wrong");
exit(-1);
}
flowTag = sender_src_port_map[make_pair(q->sport, make_pair(sid, did))];
sender_src_port_map.erase(make_pair(q->sport, make_pair(sid, did)));
received_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]+=q->m_size;
if(!is_receive_finished(sid,did,flowTag)) {
#ifdef NS3_MTP
cs.ExitSection();
#endif
return;
}
notify_size = received_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))];
received_chunksize.erase(std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
notify_receiver_receive_data(sid, did, notify_size, flowTag);
}
void send_finish(FILE *fout, Ptr<RdmaQueuePair> q) {
uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip);
AstraSim::ncclFlowTag flowTag;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG,"数据包出发送网卡 send finish, src: %d did: %d port: %d srcip %d dstip %d total bytes: %d at the tick: %d",sid,did,q->sport,q->sip,q->dip,q->m_size,AstraSim::Sys::boostedTick());
int all_sent_chunksize;
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
flowTag = sender_src_port_map[make_pair(q->sport, make_pair(sid, did))];
sent_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))]+=q->m_size;
if(!is_sending_finished(sid,did,flowTag)) {
#ifdef NS3_MTP
cs.ExitSection();
#endif
return;
}
all_sent_chunksize = sent_chunksize[std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did))];
sent_chunksize.erase(std::make_pair(flowTag.current_flow_id,std::make_pair(sid,did)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
notify_sender_sending_finished(sid, did, all_sent_chunksize, flowTag);
}
int main1(string network_topo,string network_conf) {
clock_t begint, endt;
begint = clock();
if (!ReadConf(network_topo,network_conf))
return -1;
SetConfig();
SetupNetwork(qp_finish,send_finish);
std::cout << "Running Simulation.\n";
fflush(stdout);
NS_LOG_INFO("Run Simulation.");
endt = clock();
return 0;
}
#endif