in platform/networkstrate/consensus_manager.cpp [216:300]
int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
std::unique_lock<std::mutex> lk(hb_mutex_);
std::vector<ReplicaInfo> replicas = GetReplicas();
HeartBeatInfo hb_info;
if (!hb_info.ParseFromString(request->data())) {
LOG(ERROR) << "parse replica info fail\n";
return -1;
}
LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
<< " primary:" << hb_info.primary()
<< " version:" << hb_info.version()
<< " from region:" << request->region_info().region_id()
<< " sender:" << hb_info.sender()
<< " last send:" << hb_info.hb_version()
<< " current v:" << hb_[hb_info.sender()];
if (request->region_info().region_id() ==
config_.GetConfigData().self_region_id()) {
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
// TODO count 2f+1 before setting a new primary
SetPrimary(hb_info.primary(), hb_info.version());
}
}
int replica_num = 0;
// Update the public keys received from others.
for (const auto& public_key : hb_info.public_keys()) {
if (verifier_ && !verifier_->AddPublicKey(public_key)) {
LOG(ERROR) << "set public key fail from:"
<< public_key.public_key_info().node_id();
continue;
}
if (request->region_info().region_id() !=
config_.GetConfigData().self_region_id()) {
// LOG(ERROR) << "key from other region:"
// << request->region_info().region_id();
continue;
}
ReplicaInfo info;
info.set_ip(public_key.public_key_info().ip());
info.set_port(public_key.public_key_info().port());
info.set_id(public_key.public_key_info().node_id());
if (info.ip().empty()) {
LOG(ERROR) << "public doesn't have ip, skip";
continue;
}
// Check whether there is a new replica joining.
// TODO notify new replica
if (public_key.public_key_info().type() == CertificateKeyInfo::REPLICA) {
replica_num++;
if (!ReplicaExisted(info, replicas)) {
// AddNewReplica(info);
}
} else {
if (!ReplicaExisted(info, clients_)) {
AddNewClient(info);
}
}
}
if (!hb_info.ip().empty() && hb_info.hb_version() > 0 &&
hb_[hb_info.sender()] != hb_info.hb_version()) {
ReplicaInfo info;
info.set_ip(hb_info.ip());
info.set_port(hb_info.port());
info.set_id(hb_info.sender());
// bc_client_->Flush(info);
hb_[hb_info.sender()] = hb_info.hb_version();
SendHeartBeat();
}
if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) {
LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id()
<< " is ready "
"=====================";
is_ready_ = true;
}
return 0;
}