void ReplicaCommunicator::StartSingleInBackGround()

in platform/networkstrate/replica_communicator.cpp [127:169]


void ReplicaCommunicator::StartSingleInBackGround(const std::string& ip, int port) {
  single_bq_[std::make_pair(ip,port)] = std::make_unique<BatchQueue<std::unique_ptr<QueueItem>>>("s_batch", tcp_batch_);

  ReplicaInfo replica_info;
  for (const auto& replica : replicas_) {
    if (replica.ip() == ip && replica.port() == port) {
      replica_info = replica;
      break;
    }
  }

  if (replica_info.ip().empty()) {
    for (const auto& replica : GetClientReplicas()) {
      if (replica.ip() == ip && replica.port() == port) {
        replica_info = replica;
        break;
      }
    }
  }


  single_thread_.push_back(std::thread([&](BatchQueue<std::unique_ptr<QueueItem>> *bq, ReplicaInfo replica_info) {
    while (IsRunning()) {
      std::vector<std::unique_ptr<QueueItem>> batch_req =
          bq->Pop(50000);
      if (batch_req.empty()) {
        continue;
      }
      BroadcastData broadcast_data;
      for (auto& queue_item : batch_req) {
        broadcast_data.add_data()->swap(queue_item->data);
      }

      global_stats_->SendBroadCastMsg(broadcast_data.data_size());
      //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size();
      int ret = SendMessageFromPool(broadcast_data, {replica_info});
      if (ret < 0) {
        LOG(ERROR) << "broadcast request fail:";
      }
      //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size()<<" done";
    }
  }, single_bq_[std::make_pair(ip,port)].get(), replica_info));
}