int ConsensusManager::ProcessHeartBeat()

in platform/networkstrate/consensus_manager.cpp [216:300]


int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
                                       std::unique_ptr<Request> request) {
  std::unique_lock<std::mutex> lk(hb_mutex_);
  std::vector<ReplicaInfo> replicas = GetReplicas();
  HeartBeatInfo hb_info;
  if (!hb_info.ParseFromString(request->data())) {
    LOG(ERROR) << "parse replica info fail\n";
    return -1;
  }

  LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
             << " primary:" << hb_info.primary()
             << " version:" << hb_info.version()
             << " from region:" << request->region_info().region_id()
             << " sender:" << hb_info.sender()
             << " last send:" << hb_info.hb_version()
             << " current v:" << hb_[hb_info.sender()];

  if (request->region_info().region_id() ==
      config_.GetConfigData().self_region_id()) {
    if (config_.GetPublicKeyCertificateInfo()
            .public_key()
            .public_key_info()
            .type() == CertificateKeyInfo::CLIENT) {
      // TODO count 2f+1 before setting a new primary
      SetPrimary(hb_info.primary(), hb_info.version());
    }
  }

  int replica_num = 0;
  // Update the public keys received from others.
  for (const auto& public_key : hb_info.public_keys()) {
    if (verifier_ && !verifier_->AddPublicKey(public_key)) {
      LOG(ERROR) << "set public key fail from:"
                 << public_key.public_key_info().node_id();
      continue;
    }
    if (request->region_info().region_id() !=
        config_.GetConfigData().self_region_id()) {
      // LOG(ERROR) << "key from other region:"
      //           << request->region_info().region_id();
      continue;
    }

    ReplicaInfo info;
    info.set_ip(public_key.public_key_info().ip());
    info.set_port(public_key.public_key_info().port());
    info.set_id(public_key.public_key_info().node_id());
    if (info.ip().empty()) {
      LOG(ERROR) << "public doesn't have ip, skip";
      continue;
    }
    // Check whether there is a new replica joining.
    // TODO notify new replica
    if (public_key.public_key_info().type() == CertificateKeyInfo::REPLICA) {
      replica_num++;
      if (!ReplicaExisted(info, replicas)) {
        // AddNewReplica(info);
      }
    } else {
      if (!ReplicaExisted(info, clients_)) {
        AddNewClient(info);
      }
    }
  }

  if (!hb_info.ip().empty() && hb_info.hb_version() > 0 &&
      hb_[hb_info.sender()] != hb_info.hb_version()) {
    ReplicaInfo info;
    info.set_ip(hb_info.ip());
    info.set_port(hb_info.port());
    info.set_id(hb_info.sender());
    // bc_client_->Flush(info);
    hb_[hb_info.sender()] = hb_info.hb_version();
    SendHeartBeat();
  }

  if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) {
    LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id()
               << " is ready "
                  "=====================";
    is_ready_ = true;
  }
  return 0;
}