void NcclTreeFlowModel::insert_packets()

in astra-sim-alibabacloud/astra-sim/system/collective/NcclTreeFlowModel.cc [445:511]


void NcclTreeFlowModel::insert_packets(int channel_id, int flow_id) {
  MockNcclLog* NcclLog = MockNcclLog::getInstance();
  assert(channel_id < m_channels);
  if (!enabled) {
    return;
  }
  assert(_flow_models.count(std::make_pair(channel_id, flow_id)) != 0);

  MockNccl::SingleFlow f = _flow_models[std::make_pair(channel_id, flow_id)];
  assert(zero_latency_packets->count(channel_id) != 0 && non_zero_latency_packets->count(channel_id) != 0);
  if ((*zero_latency_packets)[channel_id] == 0 && (*non_zero_latency_packets)[channel_id] == 0) {
    (*zero_latency_packets)[channel_id] = parallel_reduce * 1;
    (*non_zero_latency_packets)[channel_id] = get_non_zero_latency_packets();
    toggle = !toggle;
  }
  int current_receiver = f.dest;
  std::vector<int> current_sender = f.prev;
  if ((*zero_latency_packets)[channel_id] > 0) {
    NcclLog->writeLog(NcclLogLevel::DEBUG,"id:  %d (*zero_latency_packets)[channel_id] > 0",id);
    uint64_t message_size = f.flow_size;
    packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket(
        stream->current_queue_id,
        current_sender[0], 
        current_receiver,
        message_size,
        channel_id,
        flow_id));
    packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id);
    packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr;
    processed = false;
    send_back = false;
    NPU_to_MA = true;
    release_packets(channel_id, flow_id, message_size);
    (*zero_latency_packets)[channel_id]--;
    NcclLog->writeLog(NcclLogLevel::DEBUG,"id:  %d (*zero_latency_packets)[channel_id] : %d ",id,(*zero_latency_packets)[channel_id]);
    return;
  } else if ((*non_zero_latency_packets)[channel_id] > 0) {
    NcclLog->writeLog(NcclLogLevel::DEBUG,"id:  %d (*non_zero_latency_packets)[channel_id] > 0",id);
    uint64_t message_size = f.flow_size;
    packets[std::make_pair(channel_id, flow_id)].push_back(MyPacket(
        stream->current_queue_id,
        current_sender[0],  
        current_receiver,
        message_size,
        channel_id,
        flow_id)); 
    packets[std::make_pair(channel_id, flow_id)].back().set_flow_id(flow_id);
    packets[std::make_pair(channel_id, flow_id)].back().sender = nullptr;
    if (comType == ComType::Reduce_Scatter ||
        (comType == ComType::All_Reduce && toggle)) {
      processed = true;
    } else {
      processed = false;
    }
    if ((*non_zero_latency_packets)[channel_id] <= parallel_reduce * 1) {
      send_back = false;
    } else {
      send_back = true;
    }
    NPU_to_MA = false;
    release_packets(channel_id, flow_id, message_size);
    (*non_zero_latency_packets)[channel_id]--;
    NcclLog->writeLog(NcclLogLevel::DEBUG,"id:  %d (*non_zero_latency_packets)[channel_id] : %d ",id,(*non_zero_latency_packets)[channel_id]);
    return;
  }
  Sys::sys_panic("should not inject nothing!");
}