in platform/consensus/execution/transaction_executor.cpp [256:347]
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
RegisterExecute(request->seq());
std::unique_ptr<BatchUserRequest> batch_request = nullptr;
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> data;
std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
BatchUserRequest* batch_request_p = nullptr;
// Execute the request, then send the response back to the user.
if (batch_request_p == nullptr) {
batch_request = std::make_unique<BatchUserRequest>();
if (!batch_request->ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
batch_request->set_hash(request->hash());
if (request->has_committed_certs()) {
*batch_request->mutable_committed_certs() = request->committed_certs();
}
batch_request->set_seq(request->seq());
batch_request->set_proxy_id(request->proxy_id());
batch_request_p = batch_request.get();
// LOG(ERROR)<<"get data from req:";
} else {
assert(batch_request_p);
batch_request_p->set_seq(request->seq());
batch_request_p->set_proxy_id(request->proxy_id());
// LOG(ERROR)<<" get from cache:"<<uid;
}
assert(batch_request_p);
// LOG(INFO) << " get request batch size:"
// << batch_request.user_requests_size()<<" proxy id:"
// <<request->proxy_id()<<" need execute:"<<need_execute;
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
response = transaction_manager_->ExecuteBatch(*batch_request_p);
} else {
std::vector<std::unique_ptr<std::string>> response_v;
if(data_p == nullptr) {
int64_t start_time = GetCurrentTime();
data = std::move(transaction_manager_->Prepare(*batch_request_p));
int64_t end_time = GetCurrentTime();
if (end_time - start_time > 10) {
// LOG(ERROR)<<"exec data done:"<<uid<<" wait
// time:"<<(end_time-start_time);
}
data_p = data.get();
}
WaitForExecute(request->seq());
if(data_p->empty() || (*data_p)[0] == nullptr){
response = transaction_manager_->ExecuteBatch(*batch_request_p);
}
else {
response_v = transaction_manager_->ExecuteBatchData(*data_p);
}
FinishExecute(request->seq());
if(response == nullptr){
response = std::make_unique<BatchUserResponse>();
for (auto& s : response_v) {
response->add_response()->swap(*s);
}
}
}
}
// LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
if (duplicate_manager_ && batch_request_p) {
duplicate_manager_->AddExecuted(batch_request_p->hash(), batch_request_p->seq());
}
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
response->set_proxy_id(batch_request_p->proxy_id());
response->set_createtime(batch_request_p->createtime() + request->queuing_time());
response->set_local_id(batch_request_p->local_id());
response->set_seq(request->seq());
if (post_exec_func_) {
post_exec_func_(std::move(request), std::move(response));
}
global_stats_->IncExecuteDone();
}