in platform/consensus/ordering/pbft/response_manager.cpp [167:211]
CollectorResultCode ResponseManager::AddResponseMsg(
const SignatureInfo& signature, std::unique_ptr<Request> request,
std::function<void(const Request&,
const TransactionCollector::CollectorDataType*)>
response_call_back) {
if (request == nullptr) {
return CollectorResultCode::INVALID;
}
std::string hash = request->hash();
std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
LOG(ERROR) << "parse response fail:" << request->data().size()
<< " seq:" << request->seq();
RemoveWaitingResponseRequest(hash);
return CollectorResultCode::INVALID;
}
uint64_t seq = batch_response->local_id();
request->set_seq(seq);
int type = request->type();
int resp_received_count = 0;
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, false,
[&](const Request& request, int received_count,
TransactionCollector::CollectorDataType* data,
std::atomic<TransactionStatue>* status, bool force) {
if (MayConsensusChangeStatus(type, received_count, status)) {
resp_received_count = 1;
response_call_back(request, data);
}
});
if (ret != 0) {
return CollectorResultCode::INVALID;
}
if (resp_received_count > 0) {
collector_pool_->Update(seq);
RemoveWaitingResponseRequest(hash);
return CollectorResultCode::STATE_CHANGED;
}
return CollectorResultCode::OK;
}