astra-sim-alibabacloud/astra-sim/network_frontend/phynet/SimAiEntry.cc (99 lines of code) (raw):
/*
*Copyright (c) 2024, Alibaba Group;
*Licensed under the Apache License, Version 2.0 (the "License");
*you may not use this file except in compliance with the License.
*You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
*/
#include"astra-sim/system/MockNcclLog.h"
#ifdef PHY_RDMA
#include"astra-sim/system/SimAiFlowModelRdma.hh"
#endif
#include"astra-sim/system/PhyMultiThread.hh"
#include"astra-sim/system/RecvPacketEventHadndlerData.hh"
#include"astra-sim/system/Common.hh"
#include"astra-sim/system/BaseStream.hh"
#include"astra-sim/system/StreamBaseline.hh"
#include"SimAiEntry.h"
using namespace std;
extern FlowPhyRdma flow_rdma;
AstraSim::Sys* global_sys = nullptr;
static void
notify_receiver_receive_data(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
AstraSim::StreamBaseline* owner = global_sys->running_list.front();
AstraSim::RecvPacketEventHadndlerData *ehd = new AstraSim::RecvPacketEventHadndlerData(owner, AstraSim::EventType::PacketReceived, flowTag);
owner->consume(ehd);
}
static void
notify_sender_sending_finished(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
AstraSim::StreamBaseline* owner = global_sys->running_list.front();
AstraSim::SendPacketEventHandlerData* send_ehd = new AstraSim::SendPacketEventHandlerData(
owner,
flowTag.sender_node,
flowTag.receiver_node,
flowTag.channel_id,
AstraSim::EventType::PacketSentFinshed);
send_ehd->flowTag = flowTag;
NcclLog->writeLog(NcclLogLevel::DEBUG,"notify_sender_sending_finished_test src %d dst %d channe_id %d flow_id %d",flowTag.sender_node,flowTag.receiver_node,flowTag.channel_id,flowTag.channel_id);
owner->sendcallback(send_ehd);
}
static void
simai_recv_finish(AstraSim::ncclFlowTag flowTag) {
uint32_t sid = flowTag.sender_node, did = flowTag.receiver_node;
uint64_t notify_size = flowTag.flow_size;
notify_receiver_receive_data(sid, did, notify_size, flowTag);
}
static void
simai_send_finish(AstraSim::ncclFlowTag flowTag) {
uint32_t sid = flowTag.sender_node, did = flowTag.receiver_node;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(
NcclLogLevel::DEBUG,
" 数据包出网卡队列, src %d did %d total_bytes %lu channel_id %d flow_id %d tag_id %d",
sid,
did,
flowTag.flow_size,
flowTag.channel_id,
flowTag.current_flow_id,
flowTag.tag_id);
notify_sender_sending_finished(sid, did, flowTag.flow_size, flowTag);
}
void set_simai_network_callback(){
set_receive_finished_callback(simai_recv_finish);
set_send_finished_callback(simai_send_finish);
}
void send_flow(int src, int dst, uint64_t maxPacketCount,
void (*msg_handler)(void *fun_arg), void *fun_arg, int tag, AstraSim::sim_request *request) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
AstraSim::ncclFlowTag flowtag = request->flowTag;
TransportData send_data = TransportData(
flowtag.channel_id,
flowtag.chunk_id,
flowtag.current_flow_id,
flowtag.child_flow_id,
flowtag.sender_node,
flowtag.receiver_node,
flowtag.flow_size,
flowtag.pQps,
flowtag.tag_id,
flowtag.nvls_on);
send_data.child_flow_size = flowtag.tree_flow_list.size();
for (int i = 0; i < flowtag.tree_flow_list.size(); i++) {
send_data.child_flow_list[i] = flowtag.tree_flow_list[i];
}
NcclLog->writeLog(
NcclLogLevel::DEBUG,
"SendPackets %d SendFlow to %d channelid: %d flow_id: %d size: %lu tag_id %d",
src,
dst,
tag,
flowtag.current_flow_id,
maxPacketCount,
request->flowTag.tag_id);
#ifdef PHY_RDMA
flow_rdma.simai_ibv_post_send(
tag,
src,
dst,
&send_data,
sizeof(struct TransportData),
maxPacketCount,
flowtag.chunk_id);
#endif
}