astra-sim-alibabacloud/astra-sim/system/MockNcclGroup.cc (2,038 lines of code) (raw):
/*
*Copyright (c) 2024, Alibaba Group;
*Licensed under the Apache License, Version 2.0 (the "License");
*you may not use this file except in compliance with the License.
*You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
*/
#include "MockNcclGroup.h"
#include "MockNcclChannel.h"
#include<vector>
#include<map>
#include<set>
#include <queue>
#include <cmath>
#include <algorithm>
#include "astra-sim/system/MockNcclLog.h"
using namespace std;
namespace MockNccl {
MockNcclGroup::MockNcclGroup(int _ngpus,int _gpus_per_nodes,int _TP_size,int _DP_size,int _PP_size,int _EP_size,int _DP_EP_size,std::vector<int>_NVSwitch,GPUType _gpu_type):g_flow_id(0),gpu_type(_gpu_type){
/*init groups
*/
MockNcclLog *NcclLog = MockNcclLog::getInstance();
if (_ngpus % _gpus_per_nodes != 0 || _ngpus / _gpus_per_nodes <= 0){
NcclLog->writeLog(NcclLogLevel::ERROR,"The number of GPUs used is not a multiple of the number of GPUs per node.");
return;
}
int all_group_idx = 0;
int nNodes = _ngpus/_gpus_per_nodes;
int nlocalranks = _gpus_per_nodes;
int TP_nums = _ngpus/_TP_size;
int DP_nums = _ngpus/_DP_size;
int PP_nums = _ngpus/_PP_size;
int EP_nums = _ngpus/_EP_size;
int DP_EP_nums = _ngpus/_DP_EP_size;
if (TP_nums <= 0 || DP_nums <= 0 || PP_nums <= 0 || EP_nums <= 0 || DP_EP_nums <= 0 || (_TP_size * _DP_size * _PP_size != _ngpus) || (_EP_size * _DP_EP_size != _DP_size)){
NcclLog->writeLog(NcclLogLevel::ERROR,"The group division method is incorrect.");
return;
}
int nNodesPerTPGroup = _TP_size / nlocalranks + (_TP_size % nlocalranks > 0 ? 1 : 0);
std::vector<int>ranks;
std::vector<int>NVSwitchs;
// init TP group
if(_TP_size>1){
std::set<int>TPnodes;
for(int i =0;i<TP_nums;i++){
ranks.clear();
TPnodes.clear();
for(int j =0;j<_TP_size;j++){
int rank = i*_TP_size+j;
ranks.push_back(rank);
GroupIndex[std::make_pair(rank, TP)] = all_group_idx;
int node_idx = rank / _gpus_per_nodes;
TPnodes.insert(node_idx);
}
NVSwitchs.clear();
for(int idx:TPnodes){
NVSwitchs.push_back(_NVSwitch[idx]);
GroupIndex[std::make_pair(_NVSwitch[idx],TP)] = all_group_idx;
}
AllGroups[all_group_idx]=GroupInfo(all_group_idx,TP,nNodesPerTPGroup,_TP_size,ranks,NVSwitchs);
all_group_idx ++;
}
}
// init DP group
if(_DP_size>1){
std::set<int>DPnodes;
for(int i =0;i<DP_nums;i++){
ranks.clear();
DPnodes.clear();
for(int j =0;j<_DP_size;j++){
int rank = i+j*DP_nums;
ranks.push_back(rank);
GroupIndex[std::make_pair(rank, DP)] = all_group_idx;
int node_idx = rank/_gpus_per_nodes;
DPnodes.insert(node_idx);
}
NVSwitchs.clear();
for(int idx:DPnodes){
NVSwitchs.push_back(_NVSwitch[idx]);
GroupIndex[std::make_pair(_NVSwitch[idx],DP)] = all_group_idx;
}
AllGroups[all_group_idx]=GroupInfo(all_group_idx,DP,DPnodes.size(),_DP_size,ranks,NVSwitchs);
all_group_idx ++;
}
}
// init PP group
if(_PP_size > 1){
}
// init EP
std::map<int,GroupInfo> AllTPGroups;
for(auto it = AllGroups.begin();it!=AllGroups.end();it++){
if(it->second.type==TP){
AllTPGroups[it->second.group_index]=it->second;
}
}
if(_EP_size>1){
int TP_idx=0;
std::set<int> EPnodes;
for (int i = 0; i < TP_nums / _EP_size; i++){
TP_idx = i*_EP_size;
for(int j =0;j<_EP_size;j++){
for(int k = 0;k<AllTPGroups[TP_idx].Ranks.size();k++){
ranks.clear();
EPnodes.clear();
for(int l = TP_idx;l<TP_idx+_EP_size;l++){
int tmp_rank = AllTPGroups[l].Ranks[k];
int node_idx = tmp_rank/_gpus_per_nodes;
ranks.push_back(tmp_rank);
GroupIndex[std::make_pair(tmp_rank, EP)] = all_group_idx;
EPnodes.insert(node_idx);
}
NVSwitchs.clear();
for(int idx:EPnodes){
NVSwitchs.push_back(_NVSwitch[idx]);
GroupIndex[std::make_pair(_NVSwitch[idx],EP)] = all_group_idx;
}
AllGroups[all_group_idx] = GroupInfo(all_group_idx,EP,EPnodes.size(),_EP_size,ranks,NVSwitchs);
all_group_idx++;
}
}
}
}
//init EP_DP
if (_DP_EP_size > 1){
int TP_idx = 0;
std::set<int> DP_EP_nodes;
for (int i = 0; i < TP_nums / _DP_EP_size; i++){
TP_idx = i;
for (int j = 0; j < _DP_EP_size; j++){
for (int k = 0; k < AllTPGroups[TP_idx].Ranks.size(); k++){
ranks.clear();
DP_EP_nodes.clear();
for (int l = TP_idx; l < TP_idx + _DP_EP_size * _EP_size; l += _EP_size){
int tmp_rank = AllTPGroups[l].Ranks[k];
int node_idx = tmp_rank / _gpus_per_nodes;
ranks.push_back(tmp_rank);
GroupIndex[std::make_pair(tmp_rank, DP_EP)] = all_group_idx;
DP_EP_nodes.insert(node_idx);
}
NVSwitchs.clear();
for (int idx : DP_EP_nodes){
NVSwitchs.push_back(_NVSwitch[idx]);
GroupIndex[std::make_pair(_NVSwitch[idx], DP_EP)] = all_group_idx;
}
AllGroups[all_group_idx] = GroupInfo(all_group_idx, DP_EP, DP_EP_nodes.size(), _DP_EP_size, ranks, NVSwitchs);
all_group_idx++;
}
}
}
}
return;
}
void MockNcclGroup::generateringchannels(std::map<int, std::vector<int>> localrings, MockNccl::GroupInfo* groupInfo, std::map<int, std::map<int, std::vector<int>>>& ringchannels) {
std::map<int,std::vector<int>>::iterator ring_it;
int current;
int prev;
int next;
int end_rank;
int nNodes = groupInfo->nNodes;
int nlocalRanks = groupInfo->nRanks/nNodes;
int delta = nNodes > 1 ? groupInfo->Ranks[nlocalRanks]-groupInfo->Ranks[0] : 0;
for(ring_it = localrings.begin();ring_it != localrings.end();ring_it++) {
prev = -1;
next = -1;
for(int i = 0; i < nNodes; i++) {
int node_send;
int node_recv;
node_recv = ring_it->second[0] + i * delta;
node_send = ring_it->second[nlocalRanks-1] + i * delta;
for(int j = 0; j < nlocalRanks; j++) {
current = ring_it->second[j] + i * delta;
if (j == nlocalRanks-1) {
next = ring_it->second[0] + (i + 1) * delta;
} else {
next = ring_it->second[j+1] + i * delta;
}
ringchannels[ring_it->first][current] = {prev,next,node_recv,node_send};
prev = current;
}
}
end_rank = ring_it->second[nlocalRanks-1] + (nNodes - 1) * delta;
ringchannels[ring_it->first][ring_it->second[0]][0] = end_rank;
ringchannels[ring_it->first][end_rank][1] = ring_it->second[0];
}
}
std::map<int, std::vector<int>> MockNcclGroup::gen_local_ring(int rank, GroupType type){
GroupInfo gp_info;
int gp_idx;
std::vector<int>ranks;
std::vector<int>localranks;
std::map<int,std::vector<int>>localrings;
int nNodes;
int nlocalranks;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type)) == 0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no relevant group info, resulting in an error in gen_local_ring");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
ranks = gp_info.Ranks;
nNodes = gp_info.nNodes;
nlocalranks = ranks.size()/nNodes;
std::sort(ranks.begin(), ranks.end());
for (int i = 0; i < nlocalranks; i++){
localranks.push_back(ranks[i]);
}
for(int i =0;i<nlocalranks;i++){
std::vector<int> vec;
for (int j = 0; j < nlocalranks; ++j) {
vec.push_back(localranks[(i + j) % nlocalranks]);
}
localrings[i] = vec;
}
return localrings;
}
RingChannels MockNcclGroup::genringchannels(int rank, MockNccl::GroupType type) {
std::map<int,std::map<int,std::vector<int>>>ringchannels;
std::map<int,std::vector<int>>localrings;
std::map<int,std::vector<int>>::iterator ring_it;
GroupInfo gp_info;
int gp_idx;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
int current;
int prev;
int next;
int end_rank;
int nNodes;
int nlocalRanks;
int delta;
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"No corresponding group information is generated, and there is an error in creating the ring channel.");
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[GroupIndex[std::make_pair(rank,type)]];
nNodes = gp_info.nNodes;
nlocalRanks = gp_info.nRanks/nNodes;
localrings = gen_local_ring(rank,type);
delta = nNodes > 1 ? gp_info.Ranks[nlocalRanks]-gp_info.Ranks[0] : 0;
for(ring_it = localrings.begin();ring_it != localrings.end();ring_it++) {
prev = -1;
next = -1;
for(int i = 0; i < nNodes; i++) {
int node_send;
int node_recv;
node_recv = ring_it->second[0] + i * delta;
node_send = ring_it->second[nlocalRanks-1] + i * delta;
for(int j = 0; j < nlocalRanks; j++) {
current = ring_it->second[j] + i * delta;
if (j == nlocalRanks-1) {
next = ring_it->second[0] + (i + 1) * delta;
} else {
next = ring_it->second[j+1] + i * delta;
}
ringchannels[ring_it->first][current] = {prev,next,node_recv,node_send};
prev = current;
}
}
end_rank = ring_it->second[nlocalRanks-1] + (nNodes - 1) * delta;
ringchannels[ring_it->first][ring_it->second[0]][0] = end_rank;
ringchannels[ring_it->first][end_rank][1] = ring_it->second[0];
}
Allringchannels[gp_idx]=ringchannels;
return ringchannels;
}
std::shared_ptr<void> MockNcclGroup::getFlowModels(GroupType type , int rank, AstraSim::ComType op,uint64_t data_size,int layer_num,State loopstate){
std::string flow_model_name;
GroupInfo gp_info;
int gp_idx;
int end_rank;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in generating the flow model.");
return nullptr;
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
switch (type){
case TP:
flow_model_name = "TP";
break;
case DP:
flow_model_name = "DP";
break;
case EP:
flow_model_name = "EP";
break;
case DP_EP:
flow_model_name = "DP_EP";
break;
default:
break;
}
flow_model_name = flow_model_name + "_" + std::to_string(gp_idx) + "_" + std::to_string(layer_num) + "_" + std::to_string(static_cast<int>(loopstate)) + "_" + std::to_string(static_cast<int>(op)) + "_" + std::to_string(data_size);
if(flow_models.count(flow_model_name)){
FlowName2nums[flow_model_name] ++;
std::shared_ptr<void> presult;
if(flow_models[flow_model_name].count(rank)!=0)
presult = flow_models[flow_model_name][rank];
else{
presult = nullptr;
}
return presult;
} else {
flow_models[flow_model_name] = genFlowModels(type,rank,op,data_size);
FlowName2nums[flow_model_name]= 1;
return flow_models[flow_model_name][rank];
}
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genFlowModels(GroupType type , int rank, AstraSim::ComType op,uint64_t data_size){
switch (op) {
case AstraSim::ComType::All_Reduce:
return genAllReduceFlowModels(type,rank,data_size);
case AstraSim::ComType::All_Gather:
return genAllGatherFlowModels(type,rank,data_size);
case AstraSim::ComType::Reduce_Scatter:
return genReduceScatterFlowModels(type,rank,data_size);
case AstraSim::ComType::All_to_All:
return genAlltoAllFlowModels(type,rank,data_size);
default:
break;
}
return {};
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genAlltoAllFlowModels(GroupType type, int rank, uint64_t data_size){
FlowModels result = {};
std::map<int,FlowModels>rank2flowmodels;
std::map<int,std::shared_ptr<FlowModels>>rank2pflowmodels;
SingleFlow tmp_result;
uint64_t chunksize;
uint64_t send_size;
int nranks;
int chunkcount;
int chunkid;
GroupInfo gp_info;
int gp_idx;
RingChannels ringchannels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in generating the flow model.");
return {};
} else {
gp_idx = GroupIndex[std::make_pair(rank,type)];
ringchannels = Allringchannels[gp_idx];
gp_info = AllGroups[gp_idx];
}
nranks = gp_info.nRanks;
chunkcount = nranks - 1;
chunksize = data_size / nranks;
data_size = data_size / nranks;
for (int i = 0; i < gp_info.Ranks.size(); i++) {
std::vector<int> prev;
for(int j = 0;j<gp_info.Ranks.size();j++) {
if(i == j) continue;
else prev.push_back(gp_info.Ranks[j]);
}
for(int j=0;j<gp_info.Ranks.size();j++){
if(i == j ) continue;
tmp_result = SingleFlow(g_flow_id,gp_info.Ranks[i],gp_info.Ranks[j],chunksize,prev,{},{},0,0,1,"RING");
result[std::make_pair(0, g_flow_id)] = tmp_result;
g_flow_id++;
}
}
for(auto flow_models_it = result.begin();flow_models_it!=result.end();flow_models_it++){
int src = flow_models_it->second.src;
int dst = flow_models_it->second.dest;
rank2flowmodels[src][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
rank2flowmodels[dst][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
}
for(auto it = rank2flowmodels.begin();it!=rank2flowmodels.end();it++){
rank2pflowmodels[it->first] = std::make_shared<FlowModels>(it->second);
}
return rank2pflowmodels;
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genReduceScatterFlowModels(
GroupType type,
int rank,
uint64_t data_size) {
FlowModels result = {};
std::map<int,FlowModels>rank2flowmodels;
std::map<int,std::shared_ptr<FlowModels>>rank2pflowmodels;
std::map<int, SingleFlow> task_list = {};
std::map<int, SingleFlow> task_list2 = {};
SingleFlow tmp_result;
uint64_t chunksize;
uint64_t send_size;
int nranks;
int chunkcount;
int chunkid;
GroupInfo gp_info;
int gp_idx;
RingChannels ringchannels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in generating the flow model.");
return {};
} else {
gp_idx = GroupIndex[std::make_pair(rank,type)];
ringchannels = Allringchannels[gp_idx];
gp_info = AllGroups[gp_idx];
}
bool PXN_ENABLE = false;
const char* PXN_ENV = std::getenv("AS_PXN_ENABLE");
if (PXN_ENV && strcmp(PXN_ENV, "1") == 0) {
PXN_ENABLE = true;
} else {
PXN_ENABLE = false;
}
nranks = gp_info.nRanks;
chunkcount = nranks - 1;
chunksize = data_size / nranks / ringchannels.size();
data_size = data_size / nranks / ringchannels.size();
for (auto it = ringchannels.begin(); it != ringchannels.end(); it++) {
auto ring = it->second;
auto ring_id = it->first;
task_list = {};
send_size = 0;
chunkid = 0;
while (send_size < data_size) {
uint64_t real_chunksize = std::min(chunksize, data_size - send_size);
int prenoderecvrank = ring.rbegin()->second[2];
int prenodesendrank = ring.rbegin()->second[3];
int curnoderecvrank = ring.begin()->second[2];
int curnodesendrank = ring.begin()->second[3];
std::vector<int> prevranks = {};
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
int cur_rank = rank_it->first;
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if (rank_it->second[0] != -1)
prevranks = {rank_it->second[0]};
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{},
{g_flow_id + 1},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
if (rank_it->first != -1) {
prevranks = {rank_it->first};
} else {
prevranks = {};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunkid,
chunkcount,
"PXN_INIT");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(prenoderecvrank!=-1){
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
}
}
chunkid++;
for (int i = 0; i < nranks - 2; i++) {
task_list2 = {};
prenoderecvrank = ring.rbegin()->second[2];
prenodesendrank = ring.rbegin()->second[3];
curnoderecvrank = ring.begin()->second[2];
curnodesendrank = ring.begin()->second[3];
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
int cur_rank = rank_it->first;
int partner_flow_id = task_list[rank_it->second[0]].flow_id;
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if (rank_it->second[0] != -1) {
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{partner_flow_id},
{g_flow_id + 1},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
if(rank_it->first!=-1){
prevranks={rank_it->first};
}else{
prevranks ={};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunkid,
chunkcount,
"RING");
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if (prenoderecvrank != -1) {
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id .push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks= {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
}
}
task_list = task_list2;
chunkid++;
}
send_size += real_chunksize;
}
}
for(auto flow_models_it = result.begin();flow_models_it!=result.end();flow_models_it++){
int src = flow_models_it->second.src;
int dst = flow_models_it->second.dest;
rank2flowmodels[src][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
rank2flowmodels[dst][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
}
for(auto it = rank2flowmodels.begin();it!=rank2flowmodels.end();it++){
rank2pflowmodels[it->first] = std::make_shared<FlowModels>(it->second);
}
return rank2pflowmodels;
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genAllReduceFlowModels(GroupType type , int rank,uint64_t data_size){
ncclInfo* ncc_info = get_algo_proto_info(type,rank,AstraSim::ComType::All_Reduce,data_size);
switch (ncc_info->algorithm) {
case NCCL_ALGO_TREE:
case NCCL_ALGO_RING:
return genAllReduceRingFlowModels(type, rank, data_size);
case NCCL_ALGO_NVLS:
return genAllreduceNVLSFlowModels(type,rank,data_size);
case NCCL_ALGO_NVLS_TREE:
return {};
default:
break;
}
}
std::shared_ptr<FlowModels> MockNcclGroup::genallReduceNVLSTreeFlowModels(
GroupType type,
int rank,
uint64_t data_size) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
GroupInfo gp_info;
int gp_idx;
int chunk_count = 1;
int chunk_size;
NVLStreechannels nvlstreechannels;
NVLStreechannels::iterator nvlstree;
FlowModels result = {};
if(GroupIndex.count(std::make_pair(rank,type)) == 0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no relevant group info, resulting in an error in generating genallReduceNVLSTreeFlowModels.");
return nullptr;
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
nvlstreechannels = AllNVLStreechannels[gp_idx];
NcclLog->writeLog(NcclLogLevel::DEBUG," nvlstreechannels.size() %d",nvlstreechannels.size());
chunk_size = data_size / nvlstreechannels.size() / chunk_count;
for (nvlstree = nvlstreechannels.begin();
nvlstree != nvlstreechannels.end();
nvlstree++) {
std::map<int, std::vector<ncclChannelNode*>>::iterator nvlstreenodes_it;
if (rank == 0) {
for (nvlstreenodes_it = nvlstree->second.begin();
nvlstreenodes_it != nvlstree->second.end();
nvlstreenodes_it++) {
NcclLog->writeLog(NcclLogLevel::DEBUG," rank %d nvls tree nodes ",nvlstreenodes_it->first);
int i = 0;
for (auto nvlstreenode : nvlstreenodes_it->second) {
NcclLog->writeLog(NcclLogLevel::DEBUG," node %d rank %d",i,nvlstreenode->rank);
if(nvlstreenode->up!=nullptr){
NcclLog->writeLog(NcclLogLevel::DEBUG," up %d",nvlstreenode->up->rank);
}
NcclLog->writeLog(NcclLogLevel::DEBUG," down ");
for (auto down : nvlstreenode->down) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"%d ",down->rank);
}
}
}
}
std::unordered_map<ncclChannelNode*, int> upinDegree;
std::unordered_map<ncclChannelNode*, int> downinDegree;
std::unordered_map<ncclChannelNode*, std::vector<int>> nodeprevs;
for (int ck = 0; ck < chunk_count; ck++) {
nodeprevs = {};
std::vector<ncclChannelNode*> ncclchannelnodes;
for (auto nvlstreenodes : nvlstree->second) {
for (auto nvlstreenode : nvlstreenodes.second) {
ncclchannelnodes.push_back(nvlstreenode);
upinDegree[nvlstreenode] = nvlstreenode->down.size();
if (nvlstreenode->up == nullptr)
downinDegree[nvlstreenode] = 0;
else
downinDegree[nvlstreenode] = 1;
}
}
generate_flow_model_nvls_tree_allreduce_up(
ncclchannelnodes,
upinDegree,
nodeprevs,
chunk_size,
ck,
chunk_count,
nvlstree->first,
result);
generate_flow_model_nvls_tree_allreduce_down(
ncclchannelnodes,
downinDegree,
nodeprevs,
chunk_size,
ck,
chunk_count,
nvlstree->first,
result);
}
}
std::shared_ptr<FlowModels> ptr_result =
std::make_shared<FlowModels>(result);
return ptr_result;
}
FlowModels MockNcclGroup::generate_flow_model_nvls_tree_allreduce_up(
std::vector<ncclChannelNode*> nvlstreenodes,
std::unordered_map<ncclChannelNode*, int> upinDegree,
std::unordered_map<ncclChannelNode*, std::vector<int>>& nodeprevs,
int chunk_size,
int chunk_id,
int chunk_count,
int channle_id,
FlowModels& result) {
std::queue<ncclChannelNode*> q;
SingleFlow tmp_result;
for (auto entry : upinDegree) {
if (entry.second == 0) {
q.push(entry.first);
nodeprevs[entry.first] = {};
}
}
std::string conn_tag = "NVLS_TREE";
while (!q.empty()) {
ncclChannelNode* current = q.front();
q.pop();
if (current->up != nullptr) {
upinDegree[current->up]--;
std::vector<int> _prev;
if (current->down.size() == 0)
_prev = {current->up->rank};
else {
for (auto down : current->down) {
_prev.push_back(down->rank);
}
}
tmp_result = SingleFlow(
g_flow_id,
current->rank,
current->up->rank,
chunk_size,
_prev,
nodeprevs[current],
{},
channle_id,
chunk_id,
chunk_count,
conn_tag);
for (int parent_flow_id : nodeprevs[current]) {
result[std::make_pair(channle_id, parent_flow_id)]
.child_flow_id.push_back(g_flow_id);
}
result[std::make_pair(channle_id, g_flow_id)] = tmp_result;
g_flow_id++;
nodeprevs[current->up].push_back(tmp_result.flow_id);
nodeprevs.erase(current);
if (upinDegree[current->up] == 0)
q.push(current->up);
}
}
return result;
}
FlowModels MockNcclGroup::generate_flow_model_nvls_tree_allreduce_down(
std::vector<ncclChannelNode*> nvlstreenodes,
std::unordered_map<ncclChannelNode*, int> downinDegree,
std::unordered_map<ncclChannelNode*, std::vector<int>>& nodeprevs,
int chunk_size,
int chunk_id,
int chunk_count,
int channle_id,
FlowModels& result) {
std::queue<ncclChannelNode*> q;
SingleFlow tmp_result;
for (auto entry : downinDegree) {
if (entry.second == 0) {
q.push(entry.first);
}
}
std::string conn_tag = "NVLS_TREE";
while (!q.empty()) {
ncclChannelNode* current = q.front();
q.pop();
if (current->down.size() > 0) {
for (ncclChannelNode* down : current->down) {
downinDegree[down]--;
std::vector<int> _prev;
if (current->up == nullptr) {
for (ncclChannelNode* down1 : current->down) {
_prev.push_back(down1->rank);
}
} else {
_prev = {current->up->rank};
}
tmp_result = SingleFlow(
g_flow_id,
current->rank,
down->rank,
chunk_size,
_prev,
nodeprevs[current],
{},
channle_id,
chunk_id,
chunk_count,
conn_tag);
for (int parent_flow_id : nodeprevs[current]) {
result[std::make_pair(channle_id, parent_flow_id)]
.child_flow_id.push_back(g_flow_id);
}
result[std::make_pair(channle_id, g_flow_id)] = tmp_result;
g_flow_id++;
nodeprevs[down].push_back(tmp_result.flow_id);
if (downinDegree[down] == 0)
q.push(down);
}
}
}
return result;
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genAllreduceNVLSFlowModels(GroupType type,int rank,uint64_t data_size){
GroupInfo gp_info;
int gp_idx;
int chunk_count = 4;
std::map<int,FlowModels>rank2flowmodels;
std::map<int,std::shared_ptr<FlowModels>>rank2pflowmodels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info , resulting in an error in genAllreduceNVLSFlowModels.");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
FlowModels result={};
SingleFlow treeflow;
if(gp_info.nNodes == 1){
std::vector<int>NVswitchs = gp_info.NVSwitchs;
std::vector<int>ranks = gp_info.Ranks;
int chunk_size = data_size / chunk_count;
for(int ck =0;ck<chunk_count;ck++){
for(int j = 0;j<NVswitchs.size();j++){
std::vector<int>prevs;
std::vector<int>parents;
for(int k = 0;k<ranks.size();k++){
treeflow = SingleFlow(g_flow_id,ranks[k],NVswitchs[j],chunk_size,{NVswitchs[j]},{},{},0,ck,chunk_count,"NVLS");
result[std::make_pair(0,g_flow_id)]=treeflow;
prevs.push_back(ranks[k]);
parents.push_back(g_flow_id);
g_flow_id++;
}
for(int k =0;k<ranks.size();k++){
treeflow = SingleFlow(g_flow_id,NVswitchs[j],ranks[k],chunk_size,prevs,parents,{},0,ck,chunk_count,"NVLS");
result[std::make_pair(0,g_flow_id)]=treeflow;
for(auto parent:parents){
result[std::make_pair(0,parent)].child_flow_id.push_back(g_flow_id);
}
g_flow_id++;
}
}
}
}
rank2flowmodels.clear();
for(auto flow_models_it = result.begin();flow_models_it!=result.end();flow_models_it++){
int src = flow_models_it->second.src;
int dst = flow_models_it->second.dest;
rank2flowmodels[src][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
rank2flowmodels[dst][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
}
for(auto it = rank2flowmodels.begin();it!=rank2flowmodels.end();it++){
rank2pflowmodels[it->first] = std::make_shared<FlowModels>(it->second);
}
return rank2pflowmodels;
}
std::shared_ptr<FlowModels> MockNcclGroup::genAllReduceTreeFlowModels(GroupType type , int rank,uint64_t data_size){
int chunk_count = 64;
int chunk_size;
SingleFlow tmp_result;
FlowModels result1 = {};
FlowModels result = {};
std::map<int,int> task_list = {};
std::map<int,std::map<int,ncclTree>>::iterator tree;
GroupInfo gp_info;
int gp_idx;
TreeChannels treechannels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0||Alltreechannels.count(gp_idx)==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info , resulting in an error in genAllreduceNVLSFlowModels.");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
treechannels = Alltreechannels[gp_idx];
chunk_size = data_size / treechannels.size() / chunk_count;
for(tree = treechannels.begin(); tree !=treechannels.end(); tree++) {
std::unordered_map<int, int> upinDegree;
std::unordered_map<int, int> downinDegree;
std::unordered_map<int,std::vector<int>> nodeprevs;
for(int ck = 0; ck < chunk_count; ck++){
nodeprevs = {};
for(auto treenode:tree->second){
upinDegree[treenode.first] = treenode.second.down.size();
if(treenode.second.up == -1)
downinDegree[treenode.first] = 0;
else
downinDegree[treenode.first] = 1;
}
generate_flow_model_tree_allreduce_up(tree->second,upinDegree,nodeprevs,chunk_size,ck,chunk_count,tree->first,result);
generate_flow_model_tree_allreduce_down(tree->second,downinDegree,nodeprevs,chunk_size,ck,chunk_count,tree->first,result);
}
}
std::shared_ptr<FlowModels> ptr_result = std::make_shared<FlowModels>(result);
return ptr_result;
}
FlowModels MockNcclGroup::generate_flow_model_tree_allreduce_up(std::map<int,ncclTree> &nodes,std::unordered_map<int, int> upinDegree,std::unordered_map<int,std::vector<int>>& nodeprevs,int chunk_size,int chunk_id,int chunk_count,int channle_id,FlowModels& result){
std::queue<ncclTree> q;
std::map<int,int> task_list2={};
SingleFlow tmp_result;
for (auto entry : upinDegree) {
if (entry.second == 0) {
q.push(nodes[entry.first]);
nodeprevs[entry.first]={};
}
}
std::string conn_tag = "TREE_INIT";
while (!q.empty()) {
ncclTree current = q.front();
q.pop();
if(current.up != -1) {
upinDegree[current.up]--;
std::vector<int> _prev;
if (current.down.size() == 0)
_prev = {current.up};
else
_prev = current.down;
tmp_result = SingleFlow(g_flow_id,current.rank,current.up,chunk_size,_prev,nodeprevs[current.rank],{},channle_id,chunk_id,chunk_count,conn_tag);
for(int parent_flow_id:nodeprevs[current.rank])
result[std::make_pair(channle_id,parent_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(channle_id,g_flow_id)] = tmp_result;
g_flow_id++;
nodeprevs[current.up].push_back(tmp_result.flow_id);
nodeprevs.erase(current.rank);
if(upinDegree[current.up] == 0)
q.push(nodes[current.up]);
}
}
return result;
}
FlowModels MockNcclGroup::generate_flow_model_tree_allreduce_down(std::map<int,ncclTree> &nodes,std::unordered_map<int, int> downinDegree,std::unordered_map<int,std::vector<int>>& nodeprevs,int chunk_size,int chunk_id,int chunk_count,int channle_id,FlowModels& result){
std::queue<ncclTree> q;
std::map<int,int> task_list2={};
SingleFlow tmp_result;
for (auto entry : downinDegree) {
if (entry.second == 0) {
q.push(nodes[entry.first]);
}
}
std::string conn_tag = "TREE_INIT";
while (!q.empty()) {
ncclTree current = q.front();
q.pop();
if(current.down.size() >0 ) {
for(int down:current.down) {
downinDegree[down] --;
std::vector<int> _prev;
if (current.up == -1) {
_prev = current.down;
} else {
_prev = {current.up};
}
tmp_result = SingleFlow(g_flow_id,current.rank,down,chunk_size,_prev,nodeprevs[current.rank],{},channle_id,chunk_id,chunk_count,conn_tag);
for(int parent_flow_id:nodeprevs[current.rank])
result[std::make_pair(channle_id,parent_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(channle_id,g_flow_id)] = tmp_result;
g_flow_id++;
nodeprevs[down].push_back(tmp_result.flow_id);
if(downinDegree[down] == 0)
q.push(nodes[down]);
}
}
}
return result;
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genAllReduceRingFlowModels(GroupType type , int rank,uint64_t data_size){
FlowModels result = {};
std::map<int,FlowModels>rank2flowmodels;
std::map<int,std::shared_ptr<FlowModels>>rank2pflowmodels;
std::map<int,SingleFlow> task_list = {};
std::map<int,SingleFlow> task_list2 = {};
SingleFlow tmp_result;
uint64_t chunksize;
uint64_t send_size;
int nranks;
GroupInfo gp_info;
int gp_idx;
RingChannels ringchannels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in generating the flow model.");
return {};
} else {
gp_idx = GroupIndex[std::make_pair(rank,type)];
ringchannels = Allringchannels[gp_idx];
gp_info = AllGroups[gp_idx];
}
nranks = gp_info.nRanks;
bool PXN_ENABLE = false;
const char* PXN_ENV = std::getenv("AS_PXN_ENABLE");
if (PXN_ENV && strcmp(PXN_ENV, "1") == 0) {
PXN_ENABLE = true;
} else {
PXN_ENABLE = false;
}
chunksize = data_size / nranks / ringchannels.size();
data_size = data_size / nranks / ringchannels.size();
int chunkcout = 2*(gp_info.nRanks-1);
for(auto it = ringchannels.begin(); it !=ringchannels.end(); it++) {
auto ring = it->second;
auto ring_id = it->first;
task_list = {};
send_size = 0;
int chunk_id = 0;
while (send_size < data_size)
{
uint64_t real_chunksize = std::min(chunksize, data_size - send_size);
int prenoderecvrank = ring.rbegin()->second[2];
int prenodesendrank = ring.rbegin()->second[3];
int curnoderecvrank = ring.begin()->second[2];
int curnodesendrank = ring.begin()->second[3];
std::vector<int> prevranks = {};
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
int cur_rank = rank_it->first;
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks={rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{},
{g_flow_id + 1},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
prevranks.clear();
prevranks = {rank_it->first};
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunk_id,
chunkcout,
"PXN_INIT");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if (prenoderecvrank != -1) {
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks={rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
}
}
chunk_id++;
for(int i =0; i < nranks -1; i++) {
task_list2 = {};
prenoderecvrank = ring.rbegin()->second[2];
prenodesendrank = ring.rbegin()->second[3];
curnoderecvrank = ring.begin()->second[2];
curnodesendrank = ring.begin()->second[3];
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
int cur_rank = rank_it->first;
int partner_flow_id = task_list[rank_it->second[0]].flow_id;
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if (rank_it->second[0] != -1) {
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{partner_flow_id},
{g_flow_id + 1},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
prevranks.clear();
prevranks={rank_it->first};
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunk_id,
chunkcout,
"PXN_INIT");
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(prenoderecvrank!=-1){
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1)
{
prevranks ={rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
}
}
task_list = task_list2;
chunk_id++;
}
for (int i = 0; i < nranks - 2; i++) {
task_list2 = {};
prenoderecvrank = ring.rbegin()->second[2];
prenodesendrank = ring.rbegin()->second[3];
curnoderecvrank = ring.begin()->second[2];
curnodesendrank = ring.begin()->second[3];
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
int cur_rank = rank_it->first;
int partner_flow_id = task_list[rank_it->second[0]].flow_id;
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{partner_flow_id},
{g_flow_id + 1},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
prevranks.clear();
if (rank_it->first != -1) {
prevranks = {rank_it->first};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunk_id,
chunkcout,
"PXN_INIT");
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(prenoderecvrank!=-1){
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunk_id,
chunkcout,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
}
}
task_list = task_list2;
chunk_id++;
}
send_size += real_chunksize;
}
}
rank2flowmodels.clear();
for(auto flow_models_it = result.begin();flow_models_it!=result.end();flow_models_it++){
int src = flow_models_it->second.src;
int dst = flow_models_it->second.dest;
rank2flowmodels[src][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
rank2flowmodels[dst][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
}
for(auto it = rank2flowmodels.begin();it!=rank2flowmodels.end();it++){
rank2pflowmodels[it->first] = std::make_shared<FlowModels>(it->second);
}
return rank2pflowmodels;
}
std::map<int,std::shared_ptr<FlowModels>> MockNcclGroup::genAllGatherFlowModels(GroupType type , int rank,uint64_t data_size){
FlowModels result = {};
std::map<int,FlowModels>rank2flowmodels;
std::map<int,std::shared_ptr<FlowModels>>rank2pflowmodels;
std::map<int,SingleFlow> task_list = {};
std::map<int,SingleFlow> task_list2 = {};
SingleFlow tmp_result;
uint64_t chunksize;
uint64_t send_size;
int nranks;
int chunkcount;
int chunkid;
GroupInfo gp_info;
int gp_idx;
RingChannels ringchannels;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in generating the flow model.");
return {};
} else {
gp_idx = GroupIndex[std::make_pair(rank,type)];
ringchannels = Allringchannels[gp_idx];
gp_info = AllGroups[gp_idx];
}
nranks = gp_info.nRanks;
chunkcount = gp_info.nRanks-1;
chunksize = data_size / nranks / ringchannels.size();
data_size = data_size / nranks / ringchannels.size();
bool PXN_ENABLE = false;
const char* PXN_ENV = std::getenv("AS_PXN_ENABLE");
if (PXN_ENV == "1") {
PXN_ENABLE = true;
} else {
PXN_ENABLE = false;
}
for(auto it = ringchannels.begin(); it !=ringchannels.end(); it++) {
auto ring = it->second;
auto ring_id = it->first;
task_list = {};
send_size = 0;
chunkid = 0;
while (send_size < data_size) {
uint64_t real_chunksize = std::min(chunksize, data_size - send_size);
int prenoderecvrank = ring.rbegin()->second[2];
int prenodesendrank = ring.rbegin()->second[3];
int curnoderecvrank = ring.begin()->second[2];
int curnodesendrank = ring.begin()->second[3];
std::vector<int> prevranks = {};
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
int cur_rank = rank_it->first;
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{},
{g_flow_id + 1},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
prevranks.clear();
if(rank_it->first!=-1){
prevranks = {rank_it->first};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunkid,
chunkcount,
"PXN_INIT");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(prenoderecvrank!=-1){
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if (rank_it->second[0] != -1) {
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
task_list[rank_it->first] = tmp_result;
g_flow_id++;
}
}
chunkid++;
for (int i = 0; i < nranks - 2; i++) {
task_list2 = {};
prenoderecvrank = ring.rbegin()->second[2];
prenodesendrank = ring.rbegin()->second[3];
curnoderecvrank = ring.begin()->second[2];
curnodesendrank = ring.begin()->second[3];
for (auto rank_it = ring.begin(); rank_it != ring.end(); rank_it++) {
if (curnoderecvrank != rank_it->second[2] &&
curnodesendrank != rank_it->second[3]) {
prenoderecvrank = curnoderecvrank;
prenodesendrank = curnodesendrank;
curnoderecvrank = rank_it->second[2];
curnodesendrank = rank_it->second[3];
}
int cur_rank = rank_it->first;
int partner_flow_id = task_list[rank_it->second[0]].flow_id;
if (rank_it->second[3] == cur_rank &&
rank_it->second[2] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[2],
data_size,
prevranks,
{partner_flow_id},
{g_flow_id + 1},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
prevranks.clear();
if(rank_it->first){
prevranks = {rank_it->first};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->second[2],
rank_it->second[1],
data_size,
prevranks,
{g_flow_id - 1},
{},
ring_id,
chunkid,
chunkcount,
"PXN");
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else if (
rank_it->second[2] == cur_rank &&
rank_it->second[3] != cur_rank && gp_info.nNodes > 1 &&
PXN_ENABLE) {
prevranks.clear();
if(prenoderecvrank!=-1){
prevranks = {prenoderecvrank};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
} else {
prevranks.clear();
if(rank_it->second[0]!=-1){
prevranks = {rank_it->second[0]};
}
tmp_result = SingleFlow(
g_flow_id,
rank_it->first,
rank_it->second[1],
data_size,
prevranks,
{partner_flow_id},
{},
ring_id,
chunkid,
chunkcount,
"RING");
result[std::make_pair(ring_id, partner_flow_id)].child_flow_id.push_back(g_flow_id);
task_list2[rank_it->first] = tmp_result;
result[std::make_pair(ring_id, g_flow_id)] = tmp_result;
g_flow_id++;
}
}
task_list = task_list2;
chunkid++;
}
send_size += real_chunksize;
}
}
for(auto flow_models_it = result.begin();flow_models_it!=result.end();flow_models_it++){
int src = flow_models_it->second.src;
int dst = flow_models_it->second.dest;
rank2flowmodels[src][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
rank2flowmodels[dst][std::make_pair(flow_models_it->first.first,flow_models_it->first.second)]=flow_models_it->second;
}
for(auto it = rank2flowmodels.begin();it!=rank2flowmodels.end();it++){
rank2pflowmodels[it->first] = std::make_shared<FlowModels>(it->second);
}
return rank2pflowmodels;
}
ncclChannelNode* MockNcclGroup::gen_nvls_tree_intra_channels(std::vector<int> intra_topo,std::map<int, vector<ncclChannelNode*>> &nvlstreechannel){
ncclChannelNode* root = new ncclChannelNode(-1,intra_topo[0],nullptr,{});
nvlstreechannel[root->rank].push_back(root);
ncclChannelNode* nvswitch = new ncclChannelNode(-1,intra_topo[1],root,{});
nvlstreechannel[nvswitch->rank].push_back(nvswitch);
root->down.push_back(nvswitch);
for(int i =2;i<intra_topo.size();i++){
ncclChannelNode*leaf = new ncclChannelNode(-1,intra_topo[i],nvswitch,{});
nvswitch->down.push_back(leaf);
nvlstreechannel[leaf->rank].push_back(leaf);
}
return root;
}
TreeChannels MockNcclGroup::get_nvls_channels(int rank,GroupType type){
GroupInfo gp_info;
int gp_idx;
TreeChannels nvlschannel;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in get_nvls_channels.");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
if (gp_info.nNodes > 1) {
NcclLog->writeLog(NcclLogLevel::DEBUG," %d","error NVLS ALGO dont");
return {};
} else {
std::vector<int> ranks = gp_info.Ranks;
int NVswitch = gp_info.NVSwitchs[0];
for (int i = 0; i < ranks.size(); i++) {
nvlschannel[0][ranks[i]] = ncclTree(-1, ranks[i], NVswitch, {});
}
nvlschannel[0][ranks.size()] = ncclTree(-1, NVswitch, -1, ranks);
}
AllNVLSchannels[gp_idx] = nvlschannel;
return nvlschannel;
}
NVLStreechannels MockNcclGroup::get_nvls_tree_channels(int rank,GroupType type){
std::map<int,std::map<int,std::vector<ncclChannelNode*>>> nvlstreechannels;
std::map<int,std::vector<int>>localrings;
std::map<int,std::vector<int>>::iterator ring_it;
GroupInfo gp_info;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
int current;
int nNodes;
int nlocalRanks;
int delta;
int gp_idx;
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info , resulting in an error in get_nvls_tree_channels.");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
if(AllNVLStreechannels.count(gp_idx)){
return AllNVLStreechannels[gp_idx];
}
std::vector<DoubleBinaryTreeNode*>roots;
roots = genInterDouBinTree(gp_info);
nNodes = gp_info.nNodes;
nlocalRanks = gp_info.nRanks/nNodes;
localrings = gen_local_ring(rank,type);
delta = nNodes > 1 ? gp_info.Ranks[nlocalRanks]-gp_info.Ranks[0] : 0;
std::map<int,std::vector<int>>rings;
for(ring_it = localrings.begin();ring_it != localrings.end();ring_it++) {
for(int i = 0; i < nNodes; i++) {
for(int j = 0; j < nlocalRanks; j++) {
current = ring_it->second[j] + i * delta;
rings[ring_it->first].push_back(current);
}
}
}
std::map<int, std::map<int, std::vector<int>>>
allnode2ranks;
for (ring_it = rings.begin(); ring_it != rings.end(); ring_it++) {
int nrankspernode = gp_info.nRanks / nNodes;
for (int i = 0; i < gp_info.nNodes; i++) {
for (int j = 0; j < nrankspernode; j++) {
allnode2ranks[ring_it->first][i].push_back(
ring_it->second[i * nrankspernode + j]);
}
}
}
std::map<int, std::map<int, std::vector<int>>>::iterator allnode2ranks_it;
int channel_id = 0;
std::map<int, std::vector<int>> node2ranks = allnode2ranks[0];
for (DoubleBinaryTreeNode* root : roots) {
for (int index = 0; index < nlocalRanks; index++) {
std::map<int, vector<ncclChannelNode*>> nvlstreechannel;
std::map<int,ncclChannelNode*> nodencclchannlenodes;
for (int i = 0; i < nNodes; i++) {
std::vector<int> noderanks = node2ranks[i];
std::vector<int> intra_topo;
intra_topo.push_back(noderanks[index]);
intra_topo.push_back(gp_info.NVSwitchs[i]);
intra_topo.insert(
intra_topo.end(), noderanks.begin(), noderanks.end());
NcclLog->writeLog(NcclLogLevel::DEBUG," node %d intra_topo",i);
for(auto num:intra_topo){
NcclLog->writeLog(NcclLogLevel::DEBUG," %d",num);
}
ncclChannelNode* root =
gen_nvls_tree_intra_channels(intra_topo, nvlstreechannel);
nodencclchannlenodes[i] = root;
}
std::map<int, std::vector<ncclChannelNode*>>::iterator nvlstreenodes_it;
if (rank == 0) {
for (nvlstreenodes_it = nvlstreechannel.begin();
nvlstreenodes_it != nvlstreechannel.end();
nvlstreenodes_it++) {
NcclLog->writeLog(NcclLogLevel::DEBUG," rank %d nvls tree nodes ",nvlstreenodes_it->first);
int i = 0;
for (auto nvlstreenode : nvlstreenodes_it->second) {
NcclLog->writeLog(NcclLogLevel::DEBUG," node %d rank %d",i,nvlstreenode->rank);
if(nvlstreenode->up!=nullptr)
NcclLog->writeLog(NcclLogLevel::DEBUG," up %d",nvlstreenode->up->rank);
NcclLog->writeLog(NcclLogLevel::DEBUG," down ");
for (auto down : nvlstreenode->down) {
NcclLog->writeLog(NcclLogLevel::DEBUG," %d ",down->rank);
}
}
}
}
gen_nvls_tree_inter_channels(
root, nodencclchannlenodes, nvlstreechannel);
nvlstreechannels[channel_id] = nvlstreechannel;
channel_id++;
}
}
AllNVLStreechannels[gp_idx] = nvlstreechannels;
return nvlstreechannels;
}
ncclChannelNode* MockNcclGroup::gen_nvls_tree_inter_channels(
DoubleBinaryTreeNode* root,
std::map<int, ncclChannelNode*> nodencclchannlenodes,
std::map<int, vector<ncclChannelNode*>>& nvlstreechannel) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if (root == nullptr)
return nullptr;
else {
NcclLog->writeLog(NcclLogLevel::DEBUG,"before root->right: %d",root->right);
NcclLog->writeLog(NcclLogLevel::DEBUG,"before root->left: %d",root->left);
if (root->left != nullptr) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"after root->left: %d",root->left);
ncclChannelNode* cur = nodencclchannlenodes[root->node];
ncclChannelNode* left = nodencclchannlenodes[root->left->node];
cur->down.push_back(left);
left->up = cur;
gen_nvls_tree_inter_channels(root->left,nodencclchannlenodes,nvlstreechannel);
}
if (root->right != nullptr) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"after root->right: %d",root->right);
ncclChannelNode* cur = nodencclchannlenodes[root->node];
ncclChannelNode* right = nodencclchannlenodes[root->right->node];
cur->down.push_back(right);
right->up = cur;
gen_nvls_tree_inter_channels(root->right,nodencclchannlenodes,nvlstreechannel);
}
}
}
TreeChannels MockNcclGroup::gettreechannels(int rank, GroupType type){
TreeChannels treechannels;
std::map<int,std::vector<int>>localrings;
std::map<int,std::vector<int>>::iterator ring_it;
GroupInfo gp_info;
int gp_idx;
int current;
int nNodes;
int nlocalRanks;
int delta;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info and group ring channel, resulting in an error in gettreechannels.");
return {};
}
gp_idx = GroupIndex[std::make_pair(rank,type)];
gp_info = AllGroups[gp_idx];
if(Alltreechannels.count(gp_idx)){
return Alltreechannels[gp_idx];
}
nNodes = gp_info.nNodes;
nlocalRanks = gp_info.nRanks/nNodes;
localrings = gen_local_ring(rank,type);
delta = nNodes > 1 ? gp_info.Ranks[nlocalRanks]-gp_info.Ranks[0] : 0;
std::map<int,std::vector<int>>rings;
for(ring_it = localrings.begin();ring_it != localrings.end();ring_it++) {
for(int i = 0; i < nNodes; i++) {
for(int j = 0; j < nlocalRanks; j++) {
current = ring_it->second[j] + i * delta;
rings[ring_it->first].push_back(current);
}
}
}
std::vector<DoubleBinaryTreeNode*> roots;
roots = genInterDouBinTree(gp_info);
std::map<int, std::map<int, std::vector<int>>>
allnode2ranks;
for (ring_it = rings.begin(); ring_it != rings.end(); ring_it++) {
int nrankspernode = gp_info.nRanks / nNodes;
for (int i = 0; i < gp_info.nNodes; i++) {
for (int j = 0; j < nrankspernode; j++) {
allnode2ranks[ring_it->first][i].push_back(
ring_it->second[i * nrankspernode + j]);
}
}
}
std::map<int, std::map<int, std::vector<int>>>::iterator allnode2ranks_it;
int channel_id = 0;
for (allnode2ranks_it = allnode2ranks.begin();
allnode2ranks_it != allnode2ranks.end();
allnode2ranks_it++) {
std::map<int, std::vector<int>> node2ranks = allnode2ranks_it->second;
for (DoubleBinaryTreeNode* root : roots) {
std::map<int, ncclTree> treechannel;
for (int rank : gp_info.Ranks) {
ncclTree cur = ncclTree(-1, rank, -1, {});
treechannel[rank] = cur;
}
ConnInterIntraTree(root, node2ranks, treechannel);
treechannels[channel_id] = treechannel;
channel_id++;
}
Alltreechannels[gp_idx] = treechannels;
}
return treechannels;
}
void MockNcclGroup::ConnInterIntraTree(DoubleBinaryTreeNode*root,std::map<int,std::vector<int>>node2ranks,std::map<int,ncclTree>&treechannel) {
if(root == nullptr) return;
std::vector<int>ranks = node2ranks[root->node];
for(int i=0;i<ranks.size()-1;i++) {
ncclTree *current = &treechannel[ranks[i]];
ncclTree *down = &treechannel[ranks[i+1]];
current->down.push_back(ranks[i+1]);
down->up=ranks[i];
}
if(root->left!=nullptr){
ncclTree *current = &treechannel[ranks[0]];
int downrank = node2ranks[root->left->node][0];
ncclTree *down = &treechannel[downrank];
current->down.push_back(downrank);
down->up = ranks[0];
ConnInterIntraTree(root->left,node2ranks,treechannel);
}
if(root->right!=nullptr){
ncclTree *current = &treechannel[ranks[0]];
int downrank = node2ranks[root->right->node][0];
ncclTree *down = &treechannel[downrank];
current->down.push_back(downrank);
down->up = ranks[0];
ConnInterIntraTree(root->right,node2ranks,treechannel);
}
}
std::vector<MockNcclGroup::DoubleBinaryTreeNode*> MockNcclGroup::genInterDouBinTree(GroupInfo gp_info){
vector<DoubleBinaryTreeNode*> q;
vector<DoubleBinaryTreeNode*> tmp_q;
vector<DoubleBinaryTreeNode*> result;
int nNodes = gp_info.nNodes;
std::vector<int> nodes;
for(int i = 0;i < nNodes; i++)
nodes.push_back(i);
for(int i = 0;i < nodes.size();i++){
q.push_back(new DoubleBinaryTreeNode(nodes[i]));
}
while (q.size() > 1){
tmp_q = {};
int i = 0;
for(i = 0;(i + 2) < q.size();i +=4){
DoubleBinaryTreeNode* node0 = q[i];
DoubleBinaryTreeNode* node1 = q[i+1];
DoubleBinaryTreeNode* node2 = q[i+2];
node1->left = node0;
node1->right = node2;
tmp_q.push_back(node1);
if(i+3 < q.size()) {
DoubleBinaryTreeNode* node3 = q[i+3];
tmp_q.push_back((node3));
}
}
if(q.size() - i == 1) {
DoubleBinaryTreeNode* node0 = q[i];
tmp_q.push_back(node0);
} else if(q.size() - i == 2){
DoubleBinaryTreeNode* node0 = q[i];
DoubleBinaryTreeNode* node1 = q[i+1];
node1->left = node0;
tmp_q.push_back(node1);
}
q = tmp_q;
}
DoubleBinaryTreeNode* root1 = InterDouBinTreeShift(q[0],nodes);
int chunk_count = 1;
for(int i =0;i<chunk_count;i++){
result.push_back(q[0]);
result.push_back(root1);
}
return result;
}
MockNcclGroup::DoubleBinaryTreeNode* MockNcclGroup::InterDouBinTreeShift(DoubleBinaryTreeNode* root,std::vector<int>nodes){
std::map<int,DoubleBinaryTreeNode*>node2treenode;
std::map<int,int>rank2index;
std::queue<DoubleBinaryTreeNode*>q;
for(int i =0 ;i<nodes.size();i++) {
node2treenode[nodes[i]] = new DoubleBinaryTreeNode(nodes[i]);
rank2index[nodes[i]] = i;
}
q.push(root);
while (!q.empty())
{
DoubleBinaryTreeNode* current = q.front();
q.pop();
int node = current->node;
int nodeshift = nodes[(rank2index[node] + 1) % nodes.size()];
DoubleBinaryTreeNode* currentshift = node2treenode[nodeshift];
if(current->left != nullptr) {
int leftnode = current->left->node;
int leftnodeshift = nodes[(rank2index[leftnode] + 1) % nodes.size()];
currentshift->left = node2treenode[leftnodeshift];
q.push(current->left);
}
if(current->right != nullptr) {
int rightnode = current->right->node;
int rightnodeshift = nodes[(rank2index[rightnode] + 1) % nodes.size()];
currentshift->right = node2treenode[rightnodeshift];
q.push(current->right);
}
}
return node2treenode[(nodes[rank2index[root->node]+1]) % nodes.size()];
}
ncclInfo* MockNcclGroup::get_algo_proto_info(
GroupType type,
int rank,
AstraSim::ComType op,
uint64_t data_size) {
std::string ncclInfoName ;
GroupInfo gp_info;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
if(GroupIndex.count(std::make_pair(rank,type))==0){
NcclLog->writeLog(NcclLogLevel::ERROR,"There is no corresponding group info, resulting in an error with get_algo_proto_info.");
return nullptr;
}
gp_info = AllGroups[GroupIndex[std::make_pair(rank,type)]];
switch (type)
{
case TP:
ncclInfoName = "TP";
break;
case DP:
ncclInfoName = "DP";
break;
case EP:
ncclInfoName = "EP";
break;
case DP_EP:
ncclInfoName = "DP_EP";
break;
default:
break;
}
ncclInfoName+= "_"+std::to_string(static_cast<int>(op))+"_"+std::to_string(data_size);
if(nccl_infos.count(ncclInfoName)){
return nccl_infos[ncclInfoName];
}else{
bool NVLSenable = false;
const char* NVLSEnv = std::getenv("AS_NVLS_ENABLE");
if (NVLSEnv && strcmp(NVLSEnv, "1")==0) {
NVLSenable = true;
} else {
NVLSenable = false;
}
struct ncclInfo* info = new ncclInfo();
info->nBytes = data_size;
info->nChannels = 0;
info->coll = static_cast<ncclFunc_t>(op);
switch (op) {
case AstraSim::ComType::All_Reduce:
if(type==TP){
if(gpu_type==GPUType::A100||gpu_type==GPUType::A800){
info->algorithm = NCCL_ALGO_RING;
}else if(gpu_type==GPUType::H100||gpu_type==GPUType::H800){
if (gp_info.nRanks >= 8 && NVLSenable) {
info->algorithm = NCCL_ALGO_NVLS;
} else {
info->algorithm = NCCL_ALGO_RING;
}
} else{
info->algorithm = NCCL_ALGO_RING;
}
} else {
info->algorithm = NCCL_ALGO_RING;
}
break;
case AstraSim::ComType::All_Gather:
case AstraSim::ComType::Reduce_Scatter:
case AstraSim::ComType::All_to_All:
default:
info->algorithm = NCCL_ALGO_RING;
break;
}
info->protocol = NCCL_PROTO_UNDEF;
nccl_infos[ncclInfoName] = info;
return info;
}
}
}