platform/consensus/ordering/pbft/transaction_collector.cpp (195 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/transaction_collector.h"
#include <glog/logging.h>
#include "common/crypto/signature_verifier.h"
namespace resdb {
uint64_t TransactionCollector::Seq() { return seq_; }
bool TransactionCollector::IsPrepared() { return is_prepared_; }
TransactionStatue TransactionCollector::GetStatus() const { return status_; }
int TransactionCollector::SetContextList(
uint64_t seq, std::vector<std::unique_ptr<Context>> context) {
if (seq != seq_) {
return -2;
}
context_list_ = std::move(context);
return 0;
}
bool TransactionCollector::HasClientContextList(uint64_t seq) const {
if (seq != seq_) {
return false;
}
return !context_list_.empty();
}
std::vector<std::unique_ptr<Context>> TransactionCollector::FetchContextList(
uint64_t seq) {
if (seq != seq_) {
return std::vector<std::unique_ptr<Context>>();
}
return std::move(context_list_);
}
std::vector<RequestInfo> TransactionCollector::GetPreparedProof() {
std::vector<RequestInfo> prepared_info;
for (const auto& proof : prepared_proof_) {
RequestInfo info;
info.signature = proof->signature;
info.request = std::make_unique<Request>(*proof->request);
prepared_info.push_back(std::move(info));
}
return prepared_info;
}
int TransactionCollector::AddRequest(
std::unique_ptr<Request> request, const SignatureInfo& signature,
bool is_main_request,
std::function<void(const Request&, int received_count, CollectorDataType*,
std::atomic<TransactionStatue>* status, bool force)>
call_back) {
if (request == nullptr) {
LOG(ERROR) << "request empty";
return -2;
}
int32_t sender_id = request->sender_id();
std::string hash = request->hash();
int type = request->type();
uint64_t seq = request->seq();
uint64_t view = request->current_view();
if (is_committed_) {
return -2;
}
if (status_.load() == EXECUTED) {
return -2;
}
if (seq_ != static_cast<uint64_t>(request->seq())) {
// LOG(ERROR) << "data invalid, seq not the same:" << seq
// << " collect seq:" << seq_;
return -2;
}
if (is_main_request) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = signature;
request_info->request = std::move(request);
bool force = false;
if (view_ && view_ < view && !is_prepared_) {
force = true;
atomic_mian_request_.Clear();
}
int ret = atomic_mian_request_.Set(request_info);
if (!ret) {
other_main_request_.insert(std::move(request_info));
LOG(ERROR) << "set main request fail: data existed:" << seq
<< " ret:" << ret;
return -2;
}
auto main_request = atomic_mian_request_.Reference();
if (main_request->request == nullptr) {
LOG(ERROR) << "set main request data fail";
return -2;
}
view_ = view;
call_back(*main_request->request.get(), 1, nullptr, &status_, force);
return 0;
} else {
if (enable_viewchange_) {
if (type == Request::TYPE_PREPARE) {
if (status_.load() <= TransactionStatue::READY_PREPARE) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = signature;
request_info->request = std::make_unique<Request>(*request);
std::lock_guard<std::mutex> lk(mutex_);
if (is_prepared_) {
return 0;
}
prepared_proof_.push_back(std::move(request_info));
if (senders_[type].count(hash) == 0) {
senders_[type].insert(std::make_pair(hash, std::bitset<128>()));
}
senders_[type][hash][sender_id] = 1;
call_back(*request, senders_[type][hash].count(), nullptr, &status_,
false);
if (status_.load() == TransactionStatue::READY_COMMIT) {
is_prepared_ = true;
if (atomic_mian_request_.Reference() != nullptr &&
atomic_mian_request_.Reference()->request->hash() != hash) {
atomic_mian_request_.Clear();
for (auto it = other_main_request_.begin();
it != other_main_request_.end(); it++) {
if ((*it)->request->hash() == hash) {
auto request_info = std::make_unique<RequestInfo>();
request_info->signature = (*it)->signature;
request_info->request = std::move((*it)->request);
atomic_mian_request_.Set(request_info);
break;
}
}
other_main_request_.clear();
}
int pos = 0;
for (size_t i = 0; i < prepared_proof_.size(); i++) {
if (prepared_proof_[i]->request->hash() == hash) {
prepared_proof_[pos++] = std::move(prepared_proof_[i]);
}
}
prepared_proof_.erase(prepared_proof_.begin() + pos,
prepared_proof_.end());
}
}
return 0;
}
}
if (request->type() == Request::TYPE_COMMIT) {
if (request->has_data_signature() &&
request->data_signature().node_id() > 0) {
std::lock_guard<std::mutex> lk(mutex_);
LOG(ERROR) << "add qc signature";
commit_certs_.push_back(request->data_signature());
}
}
{
std::lock_guard<std::mutex> lk(mutex_);
if (senders_[type].count(hash) == 0) {
senders_[type].insert(std::make_pair(hash, std::bitset<128>()));
}
senders_[type][hash][sender_id] = 1;
call_back(*request, senders_[type][hash].count(), nullptr, &status_,
false);
}
if (status_.load() == TransactionStatue::READY_EXECUTE) {
Commit();
return 1;
}
}
return 0;
}
int TransactionCollector::Commit() {
TransactionStatue old_status = TransactionStatue::READY_EXECUTE;
bool res = status_.compare_exchange_strong(
old_status, TransactionStatue::EXECUTED, std::memory_order_acq_rel,
std::memory_order_acq_rel);
if (!res) {
return -2;
}
auto main_request = atomic_mian_request_.Reference();
if (main_request == nullptr) {
LOG(ERROR) << "no main";
return -2;
}
is_committed_ = true;
if (executor_ && main_request->request) {
if (!commit_certs_.empty()) {
for (const auto& sig : commit_certs_) {
*main_request->request->mutable_committed_certs()
->add_committed_certs() = sig;
// LOG(ERROR) << "add sig:" << sig.DebugString();
}
}
executor_->Commit(std::move(main_request->request));
}
return 0;
}
std::vector<std::string> TransactionCollector::GetAllStoredHash() {
std::vector<std::string> v;
auto main_request = atomic_mian_request_.Reference();
if (main_request) {
v.push_back(main_request->request->hash());
}
for (auto& info : other_main_request_) {
v.push_back(info->request->hash());
}
return v;
}
} // namespace resdb