platform/networkstrate/consensus_manager.cpp (283 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/networkstrate/consensus_manager.h"
#include <glog/logging.h>
#include <unistd.h>
#include "platform/proto/broadcast.pb.h"
namespace resdb {
namespace {
bool ReplicaExisted(const ReplicaInfo& replica_info,
const std::vector<ReplicaInfo>& replicas) {
for (auto& rep : replicas) {
if (rep.id() == replica_info.id()) {
return true;
}
}
return false;
}
} // namespace
ConsensusManager::ConsensusManager(const ResDBConfig& config)
: config_(config), global_stats_(Stats::GetGlobalStats()) {
if (config_.SignatureVerifierEnabled()) {
verifier_ = std::make_unique<SignatureVerifier>(
config_.GetPrivateKey(), config_.GetPublicKeyCertificateInfo());
}
bc_client_ = GetReplicaClient(config_.GetReplicaInfos(), true);
}
ConsensusManager::~ConsensusManager() {
bc_client_.reset();
Stop();
}
void ConsensusManager::UpdateBroadCastClient() {
bc_client_ = GetReplicaClient(GetReplicas(), true);
}
ReplicaCommunicator* ConsensusManager::GetBroadCastClient() {
return bc_client_.get();
}
SignatureVerifier* ConsensusManager::GetSignatureVerifier() {
return verifier_ == nullptr ? nullptr : verifier_.get();
}
bool ConsensusManager::IsReady() const { return is_ready_; }
void ConsensusManager::Stop() {
ServiceInterface::Stop();
if (heartbeat_thread_.joinable()) {
heartbeat_thread_.join();
}
}
void ConsensusManager::Start() {
ServiceInterface::Start();
if (config_.HeartBeatEnabled() && verifier_) {
heartbeat_thread_ =
std::thread(&ConsensusManager::HeartBeat, this); // pass by reference
}
}
// Keep Boardcast the public keys to others.
void ConsensusManager::HeartBeat() {
LOG(INFO) << "heart beat start";
int sleep_time = 1;
std::mutex mutex;
std::condition_variable cv;
while (IsRunning()) {
{
std::unique_lock<std::mutex> lk(hb_mutex_);
SendHeartBeat();
}
std::unique_lock<std::mutex> lk(mutex);
cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000),
[&] { return !IsRunning(); });
if (is_ready_) {
if (config_.IsTestMode()) {
sleep_time = 1;
} else {
sleep_time = 60;
}
}
}
}
void ConsensusManager::SendHeartBeat() {
auto keys = verifier_->GetAllPublicKeys();
std::vector<ReplicaInfo> replicas = GetAllReplicas();
LOG(ERROR) << "all replicas:" << replicas.size();
std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
HeartBeatInfo hb_info;
hb_info.set_sender(config_.GetSelfInfo().id());
hb_info.set_ip(config_.GetSelfInfo().ip());
hb_info.set_port(config_.GetSelfInfo().port());
hb_info.set_hb_version(version_);
for (const auto& key : keys) {
*hb_info.add_public_keys() = key;
hb_info.add_node_version(hb_[key.public_key_info().node_id()]);
}
for (const auto& client : client_replicas) {
replicas.push_back(client);
}
auto client = GetReplicaClient(replicas, false);
if (client == nullptr) {
return;
}
// If it is not a client node, broadcost the current primary to the client.
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::REPLICA) {
hb_info.set_primary(GetPrimary());
hb_info.set_version(GetVersion());
}
LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
<< " is ready:" << is_ready_
<< " client size:" << client_replicas.size()
<< " svr size:" << replicas.size();
Request request;
request.set_type(Request::TYPE_HEART_BEAT);
request.mutable_region_info()->set_region_id(
config_.GetConfigData().self_region_id());
hb_info.SerializeToString(request.mutable_data());
int ret = client->SendHeartBeat(request);
if (ret <= 0) {
LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
<< " sends HB fail:" << ret;
}
}
// Porcess the packages received from the network.
// context contains the client socket which can be used for sending response to
// the client, the signature for the request will be filled inside the context
// when parsing the message.
int ConsensusManager::Process(std::unique_ptr<Context> context,
std::unique_ptr<DataInfo> request_info) {
global_stats_->IncClientCall();
// Decode the whole message, it includes the certificate and data.
ResDBMessage message;
if (!message.ParseFromArray(request_info->buff, request_info->data_len)) {
LOG(ERROR) << "parse data info fail";
return -1;
}
std::unique_ptr<Request> request = std::make_unique<Request>();
if (!request->ParseFromString(message.data())) {
LOG(ERROR) << "parse data info fail";
return -1;
}
if (request->type() == Request::TYPE_HEART_BEAT) {
return Dispatch(std::move(context), std::move(request));
}
// Check if the certificate is valid.
if (message.has_signature() && verifier_) {
bool valid = verifier_->VerifyMessage(message.data(), message.signature());
if (!valid) {
LOG(ERROR) << "request is not valid:"
<< message.signature().DebugString();
LOG(ERROR) << " msg:" << message.data().size()
<< " is recovery:" << request->is_recovery();
return -2;
}
} else {
}
// forward the signature to the request so that it can be included in the
// request/response set if needed.
context->signature = message.signature();
// LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
// << " get request type:" << request->type()
// << " from:" << request->sender_id();
return Dispatch(std::move(context), std::move(request));
}
// Dispatch the request if it is a heart beat message from other replica or a
// cert notification from clients. Otherwise, forward to the worker.
int ConsensusManager::Dispatch(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
if (request->type() == Request::TYPE_HEART_BEAT) {
return ProcessHeartBeat(std::move(context), std::move(request));
}
return ConsensusCommit(std::move(context), std::move(request));
}
int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
std::unique_lock<std::mutex> lk(hb_mutex_);
std::vector<ReplicaInfo> replicas = GetReplicas();
HeartBeatInfo hb_info;
if (!hb_info.ParseFromString(request->data())) {
LOG(ERROR) << "parse replica info fail\n";
return -1;
}
LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
<< " primary:" << hb_info.primary()
<< " version:" << hb_info.version()
<< " from region:" << request->region_info().region_id()
<< " sender:" << hb_info.sender()
<< " last send:" << hb_info.hb_version()
<< " current v:" << hb_[hb_info.sender()];
if (request->region_info().region_id() ==
config_.GetConfigData().self_region_id()) {
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
// TODO count 2f+1 before setting a new primary
SetPrimary(hb_info.primary(), hb_info.version());
}
}
int replica_num = 0;
// Update the public keys received from others.
for (const auto& public_key : hb_info.public_keys()) {
if (verifier_ && !verifier_->AddPublicKey(public_key)) {
LOG(ERROR) << "set public key fail from:"
<< public_key.public_key_info().node_id();
continue;
}
if (request->region_info().region_id() !=
config_.GetConfigData().self_region_id()) {
// LOG(ERROR) << "key from other region:"
// << request->region_info().region_id();
continue;
}
ReplicaInfo info;
info.set_ip(public_key.public_key_info().ip());
info.set_port(public_key.public_key_info().port());
info.set_id(public_key.public_key_info().node_id());
if (info.ip().empty()) {
LOG(ERROR) << "public doesn't have ip, skip";
continue;
}
// Check whether there is a new replica joining.
// TODO notify new replica
if (public_key.public_key_info().type() == CertificateKeyInfo::REPLICA) {
replica_num++;
if (!ReplicaExisted(info, replicas)) {
// AddNewReplica(info);
}
} else {
if (!ReplicaExisted(info, clients_)) {
AddNewClient(info);
}
}
}
if (!hb_info.ip().empty() && hb_info.hb_version() > 0 &&
hb_[hb_info.sender()] != hb_info.hb_version()) {
ReplicaInfo info;
info.set_ip(hb_info.ip());
info.set_port(hb_info.port());
info.set_id(hb_info.sender());
// bc_client_->Flush(info);
hb_[hb_info.sender()] = hb_info.hb_version();
SendHeartBeat();
}
if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) {
LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id()
<< " is ready "
"=====================";
is_ready_ = true;
}
return 0;
}
int ConsensusManager::ConsensusCommit(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
return -1;
}
std::vector<ReplicaInfo> ConsensusManager::GetClientReplicas() {
return clients_;
}
std::vector<ReplicaInfo> ConsensusManager::GetAllReplicas() {
auto config_data = config_.GetConfigData();
std::vector<ReplicaInfo> ret;
for (const auto& r : config_data.region()) {
for (const auto& replica : r.replica_info()) {
ret.push_back(replica);
}
}
return ret;
}
void ConsensusManager::BroadCast(const Request& request) {
int ret = bc_client_->SendMessage(request);
if (ret < 0) {
LOG(ERROR) << "broadcast request fail:";
}
}
void ConsensusManager::SendMessage(const google::protobuf::Message& message,
int64_t node_id) {
std::vector<ReplicaInfo> replicas = GetReplicas();
ReplicaInfo target_replica;
for (const auto& replica : replicas) {
if (replica.id() == node_id) {
target_replica = replica;
break;
}
}
if (target_replica.ip().empty()) {
return;
}
int ret = bc_client_->SendMessage(message, target_replica);
if (ret < 0) {
LOG(ERROR) << "broadcast request fail:";
}
}
std::unique_ptr<ReplicaCommunicator> ConsensusManager::GetReplicaClient(
const std::vector<ReplicaInfo>& replicas, bool is_use_long_conn) {
return std::make_unique<ReplicaCommunicator>(
replicas,
verifier_ == nullptr || config_.GetConfigData().not_need_signature()
? nullptr
: verifier_.get(),
is_use_long_conn, config_.GetOutputWorkerNum(), config_.GetTcpBatchNum());
}
void ConsensusManager::AddNewReplica(const ReplicaInfo& info) {}
void ConsensusManager::AddNewClient(const ReplicaInfo& info) {
clients_.push_back(info);
bc_client_->UpdateClientReplicas(clients_);
}
void ConsensusManager::SetPrimary(uint32_t primary, uint64_t version) {}
uint32_t ConsensusManager::GetPrimary() { return 1; }
uint32_t ConsensusManager::GetVersion() { return 1; }
} // namespace resdb