in platform/consensus/ordering/pbft/transaction_collector.cpp [69:195]
int TransactionCollector::AddRequest(
std::unique_ptr<Request> request, const SignatureInfo& signature,
bool is_main_request,
std::function<void(const Request&, int received_count, CollectorDataType*,
std::atomic<TransactionStatue>* status, bool force)>
call_back) {
if (request == nullptr) {
LOG(ERROR) << "request empty";
return -2;
}
int32_t sender_id = request->sender_id();
std::string hash = request->hash();
int type = request->type();
uint64_t seq = request->seq();
uint64_t view = request->current_view();
if (is_committed_) {
return -2;
}
if (status_.load() == EXECUTED) {
return -2;
}
if (seq_ != static_cast<uint64_t>(request->seq())) {
// LOG(ERROR) << "data invalid, seq not the same:" << seq
// << " collect seq:" << seq_;
return -2;
}
if (is_main_request) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = signature;
request_info->request = std::move(request);
bool force = false;
if (view_ && view_ < view && !is_prepared_) {
force = true;
atomic_mian_request_.Clear();
}
int ret = atomic_mian_request_.Set(request_info);
if (!ret) {
other_main_request_.insert(std::move(request_info));
LOG(ERROR) << "set main request fail: data existed:" << seq
<< " ret:" << ret;
return -2;
}
auto main_request = atomic_mian_request_.Reference();
if (main_request->request == nullptr) {
LOG(ERROR) << "set main request data fail";
return -2;
}
view_ = view;
call_back(*main_request->request.get(), 1, nullptr, &status_, force);
return 0;
} else {
if (enable_viewchange_) {
if (type == Request::TYPE_PREPARE) {
if (status_.load() <= TransactionStatue::READY_PREPARE) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = signature;
request_info->request = std::make_unique<Request>(*request);
std::lock_guard<std::mutex> lk(mutex_);
if (is_prepared_) {
return 0;
}
prepared_proof_.push_back(std::move(request_info));
if (senders_[type].count(hash) == 0) {
senders_[type].insert(std::make_pair(hash, std::bitset<128>()));
}
senders_[type][hash][sender_id] = 1;
call_back(*request, senders_[type][hash].count(), nullptr, &status_,
false);
if (status_.load() == TransactionStatue::READY_COMMIT) {
is_prepared_ = true;
if (atomic_mian_request_.Reference() != nullptr &&
atomic_mian_request_.Reference()->request->hash() != hash) {
atomic_mian_request_.Clear();
for (auto it = other_main_request_.begin();
it != other_main_request_.end(); it++) {
if ((*it)->request->hash() == hash) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = (*it)->signature;
request_info->request = std::move((*it)->request);
atomic_mian_request_.Set(request_info);
break;
}
}
other_main_request_.clear();
}
int pos = 0;
for (size_t i = 0; i < prepared_proof_.size(); i++) {
if (prepared_proof_[i]->request->hash() == hash) {
prepared_proof_[pos++] = std::move(prepared_proof_[i]);
}
}
prepared_proof_.erase(prepared_proof_.begin() + pos,
prepared_proof_.end());
}
}
return 0;
}
}
if (request->type() == Request::TYPE_COMMIT) {
if (request->has_data_signature() &&
request->data_signature().node_id() > 0) {
std::lock_guard<std::mutex> lk(mutex_);
LOG(ERROR) << "add qc signature";
commit_certs_.push_back(request->data_signature());
}
}
{
std::lock_guard<std::mutex> lk(mutex_);
if (senders_[type].count(hash) == 0) {
senders_[type].insert(std::make_pair(hash, std::bitset<128>()));
}
senders_[type][hash][sender_id] = 1;
call_back(*request, senders_[type][hash].count(), nullptr, &status_,
false);
}
if (status_.load() == TransactionStatue::READY_EXECUTE) {
Commit();
return 1;
}
}
return 0;
}