in astra-sim-alibabacloud/astra-sim/network_frontend/ns3/entry.h [166:237]
void notify_receiver_receive_data(int sender_node, int receiver_node,
uint64_t message_size, AstraSim::ncclFlowTag flowTag) {
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
#endif
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d",sender_node,receiver_node,message_size);
int tag = flowTag.tag_id;
if (expeRecvHash.find(make_pair(
tag, make_pair(sender_node, receiver_node))) != expeRecvHash.end()) {
task1 t2 =
expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))];
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d t2.count: %d channle id: %d",sender_node,receiver_node,message_size,t2.count,flowTag.channel_id);
AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t2.fun_arg;
if (message_size == t2.count) {
NcclLog->writeLog(NcclLogLevel::DEBUG," message_size = t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1);
ehd->flowTag = flowTag;
t2.msg_handler(t2.fun_arg);
goto receiver_end_1st_section;
} else if (message_size > t2.count) {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] =
message_size - t2.count;
NcclLog->writeLog(NcclLogLevel::DEBUG,"message_size > t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
#endif
assert(ehd->flowTag.current_flow_id == -1 && ehd->flowTag.child_flow_id == -1);
ehd->flowTag = flowTag;
t2.msg_handler(t2.fun_arg);
goto receiver_end_1st_section;
} else {
t2.count -= message_size;
expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))] = t2;
}
} else {
receiver_pending_queue[std::make_pair(std::make_pair(receiver_node, sender_node),tag)] = flowTag;
if (recvHash.find(make_pair(tag, make_pair(sender_node, receiver_node))) ==
recvHash.end()) {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] =
message_size;
} else {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] +=
message_size;
}
}
#ifdef NS3_MTP
cs.ExitSection();
#endif
receiver_end_1st_section:
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs2;
#endif
if (nodeHash.find(make_pair(receiver_node, 1)) == nodeHash.end()) {
nodeHash[make_pair(receiver_node, 1)] = message_size;
} else {
nodeHash[make_pair(receiver_node, 1)] += message_size;
}
#ifdef NS3_MTP
cs2.ExitSection();
#endif
}
}
}