in astra-sim-alibabacloud/astra-sim/system/Sys.cc [1796:1872]
void Sys::insert_stream(std::list<BaseStream*>* queue, BaseStream* baseStream) {
std::list<BaseStream*>::iterator it = queue->begin();
if (intra_dimension_scheduling == IntraDimensionScheduling::FIFO ||
baseStream->current_queue_id < 0 ||
baseStream->current_com_type == ComType::All_to_All ||
baseStream->current_com_type == ComType::All_Reduce) {
while (it != queue->end()) {
if ((*it)->initialized == true) {
std::advance(it, 1);
continue;
} else if ((*it)->priority >= baseStream->priority) {
std::advance(it, 1);
continue;
} else {
break;
}
}
} else if (intra_dimension_scheduling == IntraDimensionScheduling::RG) {
ComType one_to_last = ComType::None;
ComType last = ComType::None;
while (it != queue->end()) {
one_to_last = last;
last = (*it)->current_com_type;
if ((*it)->initialized == true) {
std::advance(it, 1);
if (it != queue->end() && (*it)->initialized == false) {
one_to_last = last;
last = (*it)->current_com_type;
std::advance(it, 1);
}
continue;
} else if ((*it)->priority > baseStream->priority) {
std::advance(it, 1);
continue;
} else if (
(last == ComType::Reduce_Scatter &&
one_to_last == ComType::All_Gather) ||
(last == ComType::All_Gather &&
one_to_last == ComType::Reduce_Scatter)) {
std::advance(it, 1);
continue;
} else {
break;
}
}
} else if (
intra_dimension_scheduling == IntraDimensionScheduling::SmallestFirst) {
while (it != queue->end()) {
if ((*it)->initialized == true) {
std::advance(it, 1);
continue;
} else if (
(*it)->my_current_phase.initial_data_size <
baseStream->my_current_phase.initial_data_size) {
std::advance(it, 1);
continue;
} else {
break;
}
}
} else if (
intra_dimension_scheduling ==
IntraDimensionScheduling::LessRemainingPhaseFirst) {
while (it != queue->end()) {
if ((*it)->initialized == true) {
std::advance(it, 1);
continue;
} else if ((*it)->phases_to_go.size() < baseStream->phases_to_go.size()) {
std::advance(it, 1);
continue;
} else {
break;
}
}
}
queue->insert(it, baseStream);
}