void Sys::proceed_to_next_vnet_baseline()

in astra-sim-alibabacloud/astra-sim/system/Sys.cc [1711:1794]


void Sys::proceed_to_next_vnet_baseline(StreamBaseline* stream) {
  MockNcclLog* NcclLog = MockNcclLog::getInstance();
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size());
  int previous_vnet = stream->current_queue_id;
  if (stream->steps_finished == 1) {
    first_phase_streams--;
  }
  if (stream->steps_finished != 0) {
    stream->net_message_latency.back() /= stream->net_message_counter;
  }
  if (stream->my_current_phase.algorithm != nullptr) {
    delete stream->my_current_phase.algorithm;
  }
  if (stream->phases_to_go.size() == 0) {
    stream->take_bus_stats_average();
    stream->dataset->notify_stream_finished((StreamStat*)stream);
  }
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2");
  if (stream->current_queue_id >= 0 && stream->my_current_phase.enabled) {
    std::list<BaseStream*>& target =
        active_Streams.at(stream->my_current_phase.queue_id);
    for (std::list<BaseStream*>::iterator it = target.begin();
         it != target.end();
         ++it) {
      if (((StreamBaseline*)(*it))->stream_num == stream->stream_num) {
        target.erase(it);
        break;
      }
    }
  }
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1");
  if (stream->phases_to_go.size() == 0) {
    total_running_streams--;
    if (previous_vnet >= 0) {
      NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1");
      scheduler_unit->notify_stream_removed(
          previous_vnet, Sys::boostedTick() - stream->last_init);
    }
    #ifdef PHY_MTP
    running_list.pop_front();
    #endif
    NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: delete stream");
    delete stream;
    return;
  }
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase3");
  stream->steps_finished++;
  stream->current_queue_id = stream->phases_to_go.front().queue_id;  
  stream->current_com_type = stream->phases_to_go.front().comm_type;

  CollectivePhase vi = stream->phases_to_go.front();
  stream->my_current_phase = vi;
  stream->phases_to_go.pop_front();
  stream->test = 0;
  stream->test2 = 0;
  stream->initialized = false;
  stream->last_phase_change = Sys::boostedTick();
  stream->total_packets_sent = 0;

  stream->net_message_latency.push_back(0);
  stream->net_message_counter = 0;
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size());

  if (stream->my_current_phase.enabled) {
    insert_stream(&active_Streams[stream->current_queue_id], stream);
  }
  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase4");

  stream->state = StreamState::Ready;

  if (previous_vnet >= 0) {
    scheduler_unit->notify_stream_removed(
        previous_vnet, Sys::boostedTick() - stream->last_init);
  }
  #ifdef PHY_MTP
  ready_list.pop_front();
  first_phase_streams++;
  total_running_streams++;
  #endif

  scheduler_unit->notify_stream_added(stream->current_queue_id);

  NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: exit");
}