astra-sim-alibabacloud/astra-sim/network_frontend/ns3/AstraSimNetwork.cc (299 lines of code) (raw):
/*
*Copyright (c) 2024, Alibaba Group;
*Licensed under the Apache License, Version 2.0 (the "License");
*you may not use this file except in compliance with the License.
*You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
*/
#include "astra-sim/system/AstraNetworkAPI.hh"
#include "astra-sim/system/Sys.hh"
#include "astra-sim/system/RecvPacketEventHadndlerData.hh"
#include "astra-sim/system/Common.hh"
#include "astra-sim/system/MockNcclLog.h"
#include "ns3/applications-module.h"
#include "ns3/core-module.h"
#include "ns3/csma-module.h"
#include "ns3/internet-module.h"
#include "ns3/network-module.h"
#include "entry.h"
#include <execinfo.h>
#include <fstream>
#include <iostream>
#include <queue>
#include <stdio.h>
#include <string>
#include <thread>
#include <unistd.h>
#include <vector>
#ifdef NS3_MTP
#include "ns3/mtp-interface.h"
#endif
#ifdef NS3_MPI
#include "ns3/mpi-interface.h"
#include <mpi.h>
#endif
#define RESULT_PATH "./ncclFlowModel_"
using namespace std;
using namespace ns3;
extern std::map<std::pair<std::pair<int, int>,int>, AstraSim::ncclFlowTag> receiver_pending_queue;
extern uint32_t node_num, switch_num, link_num, trace_num, nvswitch_num, gpus_per_server;
extern GPUType gpu_type;
extern std::vector<int>NVswitchs;
struct sim_event {
void *buffer;
uint64_t count;
int type;
int dst;
int tag;
string fnType;
};
class ASTRASimNetwork : public AstraSim::AstraNetworkAPI {
private:
int npu_offset;
public:
queue<sim_event> sim_event_queue;
ASTRASimNetwork(int rank, int npu_offset) : AstraNetworkAPI(rank) {
this->npu_offset = npu_offset;
}
~ASTRASimNetwork() {}
int sim_comm_size(AstraSim::sim_comm comm, int *size) { return 0; }
int sim_finish() {
for (auto it = nodeHash.begin(); it != nodeHash.end(); it++) {
pair<int, int> p = it->first;
if (p.second == 0) {
std::cout << "sim_finish on sent, " << " Thread id: " << pthread_self() << std::endl;
cout << "All data sent from node " << p.first << " is " << it->second
<< "\n";
} else {
std::cout << "sim_finish on received, " << " Thread id: " << pthread_self() << std::endl;
cout << "All data received by node " << p.first << " is " << it->second
<< "\n";
}
}
exit(0);
return 0;
}
double sim_time_resolution() { return 0; }
int sim_init(AstraSim::AstraMemoryAPI *MEM) { return 0; }
AstraSim::timespec_t sim_get_time() {
AstraSim::timespec_t timeSpec;
timeSpec.time_val = Simulator::Now().GetNanoSeconds();
return timeSpec;
}
virtual void sim_schedule(AstraSim::timespec_t delta,
void (*fun_ptr)(void *fun_arg), void *fun_arg) {
task1 t;
t.type = 2;
t.fun_arg = fun_arg;
t.msg_handler = fun_ptr;
t.schTime = delta.time_val;
Simulator::Schedule(NanoSeconds(t.schTime), t.msg_handler, t.fun_arg);
return;
}
virtual int sim_send(void *buffer,
uint64_t count,
int type,
int dst,
int tag,
AstraSim::sim_request *request,
void (*msg_handler)(void *fun_arg), void *fun_arg) {
dst += npu_offset;
task1 t;
t.src = rank;
t.dest = dst;
t.count = count;
t.type = 0;
t.fun_arg = fun_arg;
t.msg_handler = msg_handler;
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
sentHash[make_pair(tag, make_pair(t.src, t.dest))] = t;
#ifdef NS3_MTP
cs.ExitSection();
#endif
}
SendFlow(rank, dst, count, msg_handler, fun_arg, tag, request);
return 0;
}
virtual int sim_recv(void *buffer, uint64_t count, int type, int src, int tag,
AstraSim::sim_request *request,
void (*msg_handler)(void *fun_arg), void *fun_arg) {
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
MockNcclLog* NcclLog = MockNcclLog::getInstance();
AstraSim::ncclFlowTag flowTag = request->flowTag;
src += npu_offset;
task1 t;
t.src = src;
t.dest = rank;
t.count = count;
t.type = 1;
t.fun_arg = fun_arg;
t.msg_handler = msg_handler;
AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t.fun_arg;
AstraSim::EventType event = ehd->event;
tag = ehd->flowTag.tag_id;
NcclLog->writeLog(NcclLogLevel::DEBUG,"接收事件注册 src %d sim_recv on rank %d tag_id %d channdl id %d",src,rank,tag,ehd->flowTag.channel_id);
if (recvHash.find(make_pair(tag, make_pair(t.src, t.dest))) !=
recvHash.end()) {
uint64_t count = recvHash[make_pair(tag, make_pair(t.src, t.dest))];
if (count == t.count) {
recvHash.erase(make_pair(tag, make_pair(t.src, t.dest)));
assert(ehd->flowTag.child_flow_id == -1 && ehd->flowTag.current_flow_id == -1);
if(receiver_pending_queue.count(std::make_pair(std::make_pair(rank, src),tag))!= 0) {
AstraSim::ncclFlowTag pending_tag = receiver_pending_queue[std::make_pair(std::make_pair(rank, src),tag)];
receiver_pending_queue.erase(std::make_pair(std::make_pair(rank,src),tag));
ehd->flowTag = pending_tag;
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
t.msg_handler(t.fun_arg);
goto sim_recv_end_section;
} else if (count > t.count) {
recvHash[make_pair(tag, make_pair(t.src, t.dest))] = count - t.count;
assert(ehd->flowTag.child_flow_id == -1 && ehd->flowTag.current_flow_id == -1);
if(receiver_pending_queue.count(std::make_pair(std::make_pair(rank, src),tag))!= 0) {
AstraSim::ncclFlowTag pending_tag = receiver_pending_queue[std::make_pair(std::make_pair(rank, src),tag)];
receiver_pending_queue.erase(std::make_pair(std::make_pair(rank,src),tag));
ehd->flowTag = pending_tag;
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
t.msg_handler(t.fun_arg);
goto sim_recv_end_section;
} else {
recvHash.erase(make_pair(tag, make_pair(t.src, t.dest)));
t.count -= count;
expeRecvHash[make_pair(tag, make_pair(t.src, t.dest))] = t;
}
} else {
if (expeRecvHash.find(make_pair(tag, make_pair(t.src, t.dest))) ==
expeRecvHash.end()) {
expeRecvHash[make_pair(tag, make_pair(t.src, t.dest))] = t;
NcclLog->writeLog(NcclLogLevel::DEBUG," 网络包后到,先进行注册 recvHash do not find expeRecvHash.new make src %d dest %d t.count: %d channel_id %d current_flow_id %d",t.src,t.dest,t.count,tag,flowTag.current_flow_id);
} else {
uint64_t expecount =
expeRecvHash[make_pair(tag, make_pair(t.src, t.dest))].count;
NcclLog->writeLog(NcclLogLevel::DEBUG," 网络包后到,重复注册 recvHash do not find expeRecvHash.add make src %d dest %d expecount: %d t.count: %d tag_id %d current_flow_id %d",t.src,t.dest,expecount,t.count,tag,flowTag.current_flow_id);
}
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
sim_recv_end_section:
return 0;
}
void handleEvent(int dst, int cnt) {
}
};
struct user_param {
int thread;
string workload;
string network_topo;
string network_conf;
user_param() {
thread = 1;
workload = "";
network_topo = "";
network_conf = "";
};
~user_param(){};
};
static int user_param_prase(int argc,char * argv[],struct user_param* user_param){
int opt;
while ((opt = getopt(argc,argv,"ht:w:g:s:n:c:"))!=-1){
switch (opt)
{
case 'h':
/* code */
std::cout<<"-t number of threads,default 1"<<std::endl;
std::cout<<"-w workloads default none "<<std::endl;
std::cout<<"-n network topo"<<std::endl;
std::cout<<"-c network_conf"<<std::endl;
return 1;
break;
case 't':
user_param->thread = stoi(optarg);
break;
case 'w':
user_param->workload = optarg;
break;
case 'n':
user_param->network_topo = optarg;
break;
case 'c':
user_param->network_conf = optarg;
break;
default:
std::cerr<<"-h help message"<<std::endl;
return 1;
}
}
return 0 ;
}
int main(int argc, char *argv[]) {
struct user_param user_param;
MockNcclLog::set_log_name("SimAI.log");
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::INFO," init SimAI.log ");
if(user_param_prase(argc,argv,&user_param)){
return 0;
}
#ifdef NS3_MTP
MtpInterface::Enable(user_param.thread);
#endif
main1(user_param.network_topo,user_param.network_conf);
int nodes_num = node_num - switch_num;
int gpu_num = node_num - nvswitch_num - switch_num;
std::map<int, int> node2nvswitch;
for(int i = 0; i < gpu_num; ++ i) {
node2nvswitch[i] = gpu_num + i / gpus_per_server;
}
for(int i = gpu_num; i < gpu_num + nvswitch_num; ++ i){
node2nvswitch[i] = i;
NVswitchs.push_back(i);
}
LogComponentEnable("OnOffApplication", LOG_LEVEL_INFO);
LogComponentEnable("PacketSink", LOG_LEVEL_INFO);
LogComponentEnable("GENERIC_SIMULATION", LOG_LEVEL_INFO);
std::vector<ASTRASimNetwork *> networks(nodes_num, nullptr);
std::vector<AstraSim::Sys *> systems(nodes_num, nullptr);
for (int j = 0; j < nodes_num; j++) {
networks[j] =
new ASTRASimNetwork(j ,0);
systems[j ] = new AstraSim::Sys(
networks[j],
nullptr,
j,
0,
1,
{nodes_num},
{1},
"",
user_param.workload,
1,
1,
1,
1,
0,
RESULT_PATH,
"test1",
true,
false,
gpu_type,
{gpu_num},
NVswitchs,
gpus_per_server
);
systems[j ]->nvswitch_id = node2nvswitch[j];
systems[j ]->num_gpus = nodes_num - nvswitch_num;
}
for (int i = 0; i < nodes_num; i++) {
systems[i]->workload->fire();
}
std::cout << "simulator run " << std::endl;
Simulator::Run();
Simulator::Stop(Seconds(2000000000));
Simulator::Destroy();
#ifdef NS3_MPI
MpiInterface::Disable ();
#endif
return 0;
}