in astra-sim-alibabacloud/astra-sim/system/Sys.cc [1711:1794]
void Sys::proceed_to_next_vnet_baseline(StreamBaseline* stream) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size());
int previous_vnet = stream->current_queue_id;
if (stream->steps_finished == 1) {
first_phase_streams--;
}
if (stream->steps_finished != 0) {
stream->net_message_latency.back() /= stream->net_message_counter;
}
if (stream->my_current_phase.algorithm != nullptr) {
delete stream->my_current_phase.algorithm;
}
if (stream->phases_to_go.size() == 0) {
stream->take_bus_stats_average();
stream->dataset->notify_stream_finished((StreamStat*)stream);
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2");
if (stream->current_queue_id >= 0 && stream->my_current_phase.enabled) {
std::list<BaseStream*>& target =
active_Streams.at(stream->my_current_phase.queue_id);
for (std::list<BaseStream*>::iterator it = target.begin();
it != target.end();
++it) {
if (((StreamBaseline*)(*it))->stream_num == stream->stream_num) {
target.erase(it);
break;
}
}
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1");
if (stream->phases_to_go.size() == 0) {
total_running_streams--;
if (previous_vnet >= 0) {
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1");
scheduler_unit->notify_stream_removed(
previous_vnet, Sys::boostedTick() - stream->last_init);
}
#ifdef PHY_MTP
running_list.pop_front();
#endif
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: delete stream");
delete stream;
return;
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase3");
stream->steps_finished++;
stream->current_queue_id = stream->phases_to_go.front().queue_id;
stream->current_com_type = stream->phases_to_go.front().comm_type;
CollectivePhase vi = stream->phases_to_go.front();
stream->my_current_phase = vi;
stream->phases_to_go.pop_front();
stream->test = 0;
stream->test2 = 0;
stream->initialized = false;
stream->last_phase_change = Sys::boostedTick();
stream->total_packets_sent = 0;
stream->net_message_latency.push_back(0);
stream->net_message_counter = 0;
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size());
if (stream->my_current_phase.enabled) {
insert_stream(&active_Streams[stream->current_queue_id], stream);
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase4");
stream->state = StreamState::Ready;
if (previous_vnet >= 0) {
scheduler_unit->notify_stream_removed(
previous_vnet, Sys::boostedTick() - stream->last_init);
}
#ifdef PHY_MTP
ready_list.pop_front();
first_phase_streams++;
total_running_streams++;
#endif
scheduler_unit->notify_stream_added(stream->current_queue_id);
NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: exit");
}