in platform/networkstrate/replica_communicator.cpp [127:169]
void ReplicaCommunicator::StartSingleInBackGround(const std::string& ip, int port) {
single_bq_[std::make_pair(ip,port)] = std::make_unique<BatchQueue<std::unique_ptr<QueueItem>>>("s_batch", tcp_batch_);
ReplicaInfo replica_info;
for (const auto& replica : replicas_) {
if (replica.ip() == ip && replica.port() == port) {
replica_info = replica;
break;
}
}
if (replica_info.ip().empty()) {
for (const auto& replica : GetClientReplicas()) {
if (replica.ip() == ip && replica.port() == port) {
replica_info = replica;
break;
}
}
}
single_thread_.push_back(std::thread([&](BatchQueue<std::unique_ptr<QueueItem>> *bq, ReplicaInfo replica_info) {
while (IsRunning()) {
std::vector<std::unique_ptr<QueueItem>> batch_req =
bq->Pop(50000);
if (batch_req.empty()) {
continue;
}
BroadcastData broadcast_data;
for (auto& queue_item : batch_req) {
broadcast_data.add_data()->swap(queue_item->data);
}
global_stats_->SendBroadCastMsg(broadcast_data.data_size());
//LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size();
int ret = SendMessageFromPool(broadcast_data, {replica_info});
if (ret < 0) {
LOG(ERROR) << "broadcast request fail:";
}
//LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size()<<" done";
}
}, single_bq_[std::make_pair(ip,port)].get(), replica_info));
}