platform/consensus/ordering/pbft/transaction_collector.h (97 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <bitset>
#include "platform/consensus/execution/transaction_executor.h"
#include "platform/networkstrate/server_comm.h"
#include "platform/proto/resdb.pb.h"
#include "platform/statistic/stats.h"
namespace resdb {
enum TransactionStatue {
None = 0,
Prepare = -999,
READY_PREPARE = 1,
READY_COMMIT = 2,
READY_EXECUTE = 3,
EXECUTED = 4,
};
struct RequestInfo {
std::unique_ptr<Request> request;
SignatureInfo signature;
};
template <typename T>
class AtomicUniquePtr {
public:
AtomicUniquePtr() : v_(0) {}
bool Set(std::unique_ptr<T>& new_ptr) {
int old_v = 0;
bool res = v_.compare_exchange_strong(old_v, 1, std::memory_order_acq_rel,
std::memory_order_acq_rel);
if (!res) {
return false;
}
ptr_ = std::move(new_ptr);
v_ = 2;
return true;
}
T* Reference() {
int v = v_.load(std::memory_order_acq_rel);
if (v <= 1) {
return nullptr;
}
return ptr_.get();
}
void Clear() {
v_ = 0;
ptr_ = nullptr;
}
private:
std::unique_ptr<T> ptr_;
std::atomic<int> v_;
};
class TransactionCollector {
public:
TransactionCollector(uint64_t seq, TransactionExecutor* executor,
bool enable_viewchange = false)
: seq_(seq),
executor_(executor),
status_(TransactionStatue::None),
enable_viewchange_(enable_viewchange),
view_(0) {}
~TransactionCollector() = default;
// TODO split the context list.
// context contains the client channel used for sending back the response.
int SetContextList(uint64_t seq,
std::vector<std::unique_ptr<Context>> context);
bool HasClientContextList(uint64_t seq) const;
std::vector<std::unique_ptr<Context>> FetchContextList(uint64_t seq);
typedef std::list<std::unique_ptr<RequestInfo>> CollectorDataType;
// Add a message and count by its hash value.
// After it is done call_back will be triggered.
int AddRequest(
std::unique_ptr<Request> request, const SignatureInfo& signature,
bool is_main_request,
std::function<void(const Request&, int received_count,
CollectorDataType* data,
std::atomic<TransactionStatue>* status, bool force)>
call_back);
std::vector<RequestInfo> GetPreparedProof();
TransactionStatue GetStatus() const;
uint64_t Seq();
bool IsPrepared();
std::vector<std::string> GetAllStoredHash();
private:
int Commit();
private:
uint64_t seq_;
TransactionExecutor* executor_;
std::atomic<bool> is_committed_ = false;
std::atomic<bool> is_prepared_ = false;
std::vector<std::unique_ptr<Context>> context_list_;
std::map<std::string, std::list<std::unique_ptr<RequestInfo>>>
data_[Request::NUM_OF_TYPE];
std::vector<std::unique_ptr<RequestInfo>> prepared_proof_;
AtomicUniquePtr<RequestInfo> atomic_mian_request_;
std::atomic<TransactionStatue> status_ = TransactionStatue::None;
bool enable_viewchange_;
std::mutex mutex_;
std::vector<SignatureInfo> commit_certs_;
std::map<std::string, std::bitset<128>> senders_[Request::NUM_OF_TYPE];
std::set<std::unique_ptr<RequestInfo>> other_main_request_;
uint64_t view_;
};
} // namespace resdb