Status RaftConsensus::UnsafeChangeConfig()

in src/kudu/consensus/raft_consensus.cc [2467:2605]


Status RaftConsensus::UnsafeChangeConfig(
    const UnsafeChangeConfigRequestPB& req,
    boost::optional<ServerErrorPB::Code>* error_code) {
  if (PREDICT_FALSE(!req.has_new_config())) {
    *error_code = ServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument("Request must contain 'new_config' argument "
                                   "to UnsafeChangeConfig()", SecureShortDebugString(req));
  }
  if (PREDICT_FALSE(!req.has_caller_id())) {
    *error_code = ServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()",
                                   SecureShortDebugString(req));
  }

  // Grab the committed config and current term on this node.
  int64_t current_term;
  RaftConfigPB committed_config;
  int64_t all_replicated_index;
  int64_t last_committed_index;
  OpId preceding_opid;
  uint64_t msg_timestamp;
  {
    // Take the snapshot of the replica state and queue state so that
    // we can stick them in the consensus update request later.
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    current_term = CurrentTermUnlocked();
    committed_config = cmeta_->CommittedConfig();
    if (cmeta_->has_pending_config()) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING)
            << "Replica has a pending config, but the new config "
            << "will be unsafely changed anyway. "
            << "Currently pending config on the node: "
            << SecureShortDebugString(cmeta_->PendingConfig());
    }
    all_replicated_index = queue_->GetAllReplicatedIndex();
    last_committed_index = queue_->GetCommittedIndex();
    preceding_opid = queue_->GetLastOpIdInLog();
    msg_timestamp = time_manager_->GetSerialTimestamp().value();
  }

  // Validate that passed replica uuids are part of the committed config
  // on this node.  This allows a manual recovery tool to only have to specify
  // the uuid of each replica in the new config without having to know the
  // addresses of each server (since we can get the address information from
  // the committed config). Additionally, only a subset of the committed config
  // is required for typical cluster repair scenarios.
  std::unordered_set<string> retained_peer_uuids;
  const RaftConfigPB& config = req.new_config();
  for (const RaftPeerPB& new_peer : config.peers()) {
    const string& peer_uuid = new_peer.permanent_uuid();
    retained_peer_uuids.insert(peer_uuid);
    if (!IsRaftConfigMember(peer_uuid, committed_config)) {
      *error_code = ServerErrorPB::INVALID_CONFIG;
      return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed  "
                                                "config on this replica, rejecting the  "
                                                "unsafe config change request for tablet $1. "
                                                "Committed config: $2",
                                                peer_uuid, req.tablet_id(),
                                                SecureShortDebugString(committed_config)));
    }
  }

  RaftConfigPB new_config = committed_config;
  for (const auto& peer : committed_config.peers()) {
    const string& peer_uuid = peer.permanent_uuid();
    if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
      CHECK(RemoveFromRaftConfig(&new_config, peer_uuid));
    }
  }
  // Check that local peer is part of the new config and is a VOTER.
  // Although it is valid for a local replica to not have itself
  // in the committed config, it is rare and a replica without itself
  // in the latest config is definitely not caught up with the latest leader's log.
  if (!IsRaftConfigVoter(peer_uuid(), new_config)) {
    *error_code = ServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
                                              "a VOTER in the new config, "
                                              "rejecting the unsafe config "
                                              "change request for tablet $1. "
                                              "Rejected config: $2" ,
                                              peer_uuid(), req.tablet_id(),
                                              SecureShortDebugString(new_config)));
  }
  new_config.set_unsafe_config_change(true);
  int64_t replicate_opid_index = preceding_opid.index() + 1;
  new_config.set_opid_index(replicate_opid_index);

  // Sanity check the new config. 'type' is irrelevant here.
  Status s = VerifyRaftConfig(new_config);
  if (!s.ok()) {
    *error_code = ServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument(Substitute("The resulting new config for tablet $0  "
                                              "from passed parameters has failed raft "
                                              "config sanity check: $1",
                                              req.tablet_id(), s.ToString()));
  }

  // Prepare the consensus request as if the request is being generated
  // from a different leader.
  ConsensusRequestPB consensus_req;
  consensus_req.set_caller_uuid(req.caller_id());
  // Bumping up the term for the consensus request being generated.
  // This makes this request appear to come from a new leader that
  // the local replica doesn't know about yet. If the local replica
  // happens to be the leader, this will cause it to step down.
  const int64_t new_term = current_term + 1;
  consensus_req.set_caller_term(new_term);
  consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid);
  consensus_req.set_committed_index(last_committed_index);
  consensus_req.set_all_replicated_index(all_replicated_index);

  // Prepare the replicate msg to be replicated.
  ReplicateMsg* replicate = consensus_req.add_ops();
  ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
  cc_req->set_tablet_id(req.tablet_id());
  *cc_req->mutable_old_config() = committed_config;
  *cc_req->mutable_new_config() = new_config;
  OpId* id = replicate->mutable_id();
  // Bumping up both the term and the opid_index from what's found in the log.
  id->set_term(new_term);
  id->set_index(replicate_opid_index);
  replicate->set_op_type(CHANGE_CONFIG_OP);
  replicate->set_timestamp(msg_timestamp);

  VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
                      << SecureShortDebugString(consensus_req);

  LOG_WITH_PREFIX(WARNING)
        << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
        << "COMMITTED CONFIG: " << SecureShortDebugString(committed_config)
        << "NEW CONFIG: " << SecureShortDebugString(new_config);

  ConsensusResponsePB consensus_resp;
  return Update(&consensus_req, &consensus_resp).AndThen([&consensus_resp]{
    return consensus_resp.has_error()
        ? StatusFromPB(consensus_resp.error().status()) : Status::OK();
  });
}