in platform/consensus/execution/transaction_executor.cpp [436:496]
void TransactionExecutor::PrepareMessage() {
while (!IsStop()) {
std::unique_ptr<Request> request = prepare_queue_.Pop();
if (request == nullptr) {
continue;
}
uint64_t uid = request->uid();
int current_f = SetFlag(uid, Start_Prepare);
if (current_f == 0) {
// commit has done
// LOG(ERROR)<<" want prepare, commit started:"<<uid;
// ClearPromise(uid);
continue;
}
std::promise<int>* p = GetPromise(uid) ;
assert(p);
//LOG(ERROR)<<" prepare started:"<<uid;
// LOG(ERROR)<<" prepare uid:"<<uid;
// Execute the request, then send the response back to the user.
std::unique_ptr<BatchUserRequest> batch_request =
std::make_unique<BatchUserRequest>();
if (!batch_request->ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
// batch_request = std::make_unique<BatchUserRequest>();
batch_request->set_seq(request->seq());
batch_request->set_hash(request->hash());
batch_request->set_proxy_id(request->proxy_id());
if (request->has_committed_certs()) {
*batch_request->mutable_committed_certs() = request->committed_certs();
}
// LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
// id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
request_v = transaction_manager_->Prepare(*batch_request);
{
std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
// assert(request_v);
// assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
data_[uid%mod][uid] = std::move(request_v);
req_[uid % mod][uid] = std::move(batch_request);
}
//LOG(ERROR)<<"set promise:"<<uid;
p->set_value(1);
{
int set_ret = SetFlag(uid, End_Prepare);
if (set_ret == 0) {
// LOG(ERROR)<<"commit interrupt:"<<uid;
//ClearPromise(uid);
} else {
//LOG(ERROR)<<"prepare done:"<<uid;
}
}
}
}