in platform/consensus/ordering/pbft/performance_manager.cpp [251:291]
int PerformanceManager::BatchProposeMsg() {
LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
<< " batch num:" << config_.ClientBatchNum()
<< " max txn:" << config_.GetMaxProcessTxn();
std::vector<std::unique_ptr<QueueItem>> batch_req;
eval_ready_future_.get();
while (!stop_) {
// std::lock_guard<std::mutex> lk(mutex_);
if (send_num_[GetPrimary()] >= config_.GetMaxProcessTxn()) {
usleep(100000);
continue;
}
if (batch_req.size() < config_.ClientBatchNum()) {
std::unique_ptr<QueueItem> item =
batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
if (item == nullptr) {
continue;
}
batch_req.push_back(std::move(item));
if (batch_req.size() < config_.ClientBatchNum()) {
continue;
}
}
int ret = DoBatch(batch_req);
batch_req.clear();
if (ret != 0) {
Response response;
response.set_result(Response::ERROR);
for (size_t i = 0; i < batch_req.size(); ++i) {
if (batch_req[i]->context && batch_req[i]->context->client) {
int ret = batch_req[i]->context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp" << response.DebugString()
<< " fail ret:" << ret;
}
}
}
}
}
return 0;
}