in src/kudu/consensus/raft_consensus.cc [2467:2605]
Status RaftConsensus::UnsafeChangeConfig(
const UnsafeChangeConfigRequestPB& req,
boost::optional<ServerErrorPB::Code>* error_code) {
if (PREDICT_FALSE(!req.has_new_config())) {
*error_code = ServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Request must contain 'new_config' argument "
"to UnsafeChangeConfig()", SecureShortDebugString(req));
}
if (PREDICT_FALSE(!req.has_caller_id())) {
*error_code = ServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()",
SecureShortDebugString(req));
}
// Grab the committed config and current term on this node.
int64_t current_term;
RaftConfigPB committed_config;
int64_t all_replicated_index;
int64_t last_committed_index;
OpId preceding_opid;
uint64_t msg_timestamp;
{
// Take the snapshot of the replica state and queue state so that
// we can stick them in the consensus update request later.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
current_term = CurrentTermUnlocked();
committed_config = cmeta_->CommittedConfig();
if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING)
<< "Replica has a pending config, but the new config "
<< "will be unsafely changed anyway. "
<< "Currently pending config on the node: "
<< SecureShortDebugString(cmeta_->PendingConfig());
}
all_replicated_index = queue_->GetAllReplicatedIndex();
last_committed_index = queue_->GetCommittedIndex();
preceding_opid = queue_->GetLastOpIdInLog();
msg_timestamp = time_manager_->GetSerialTimestamp().value();
}
// Validate that passed replica uuids are part of the committed config
// on this node. This allows a manual recovery tool to only have to specify
// the uuid of each replica in the new config without having to know the
// addresses of each server (since we can get the address information from
// the committed config). Additionally, only a subset of the committed config
// is required for typical cluster repair scenarios.
std::unordered_set<string> retained_peer_uuids;
const RaftConfigPB& config = req.new_config();
for (const RaftPeerPB& new_peer : config.peers()) {
const string& peer_uuid = new_peer.permanent_uuid();
retained_peer_uuids.insert(peer_uuid);
if (!IsRaftConfigMember(peer_uuid, committed_config)) {
*error_code = ServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed "
"config on this replica, rejecting the "
"unsafe config change request for tablet $1. "
"Committed config: $2",
peer_uuid, req.tablet_id(),
SecureShortDebugString(committed_config)));
}
}
RaftConfigPB new_config = committed_config;
for (const auto& peer : committed_config.peers()) {
const string& peer_uuid = peer.permanent_uuid();
if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
CHECK(RemoveFromRaftConfig(&new_config, peer_uuid));
}
}
// Check that local peer is part of the new config and is a VOTER.
// Although it is valid for a local replica to not have itself
// in the committed config, it is rare and a replica without itself
// in the latest config is definitely not caught up with the latest leader's log.
if (!IsRaftConfigVoter(peer_uuid(), new_config)) {
*error_code = ServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
"a VOTER in the new config, "
"rejecting the unsafe config "
"change request for tablet $1. "
"Rejected config: $2" ,
peer_uuid(), req.tablet_id(),
SecureShortDebugString(new_config)));
}
new_config.set_unsafe_config_change(true);
int64_t replicate_opid_index = preceding_opid.index() + 1;
new_config.set_opid_index(replicate_opid_index);
// Sanity check the new config. 'type' is irrelevant here.
Status s = VerifyRaftConfig(new_config);
if (!s.ok()) {
*error_code = ServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("The resulting new config for tablet $0 "
"from passed parameters has failed raft "
"config sanity check: $1",
req.tablet_id(), s.ToString()));
}
// Prepare the consensus request as if the request is being generated
// from a different leader.
ConsensusRequestPB consensus_req;
consensus_req.set_caller_uuid(req.caller_id());
// Bumping up the term for the consensus request being generated.
// This makes this request appear to come from a new leader that
// the local replica doesn't know about yet. If the local replica
// happens to be the leader, this will cause it to step down.
const int64_t new_term = current_term + 1;
consensus_req.set_caller_term(new_term);
consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid);
consensus_req.set_committed_index(last_committed_index);
consensus_req.set_all_replicated_index(all_replicated_index);
// Prepare the replicate msg to be replicated.
ReplicateMsg* replicate = consensus_req.add_ops();
ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
cc_req->set_tablet_id(req.tablet_id());
*cc_req->mutable_old_config() = committed_config;
*cc_req->mutable_new_config() = new_config;
OpId* id = replicate->mutable_id();
// Bumping up both the term and the opid_index from what's found in the log.
id->set_term(new_term);
id->set_index(replicate_opid_index);
replicate->set_op_type(CHANGE_CONFIG_OP);
replicate->set_timestamp(msg_timestamp);
VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
<< SecureShortDebugString(consensus_req);
LOG_WITH_PREFIX(WARNING)
<< "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
<< "COMMITTED CONFIG: " << SecureShortDebugString(committed_config)
<< "NEW CONFIG: " << SecureShortDebugString(new_config);
ConsensusResponsePB consensus_resp;
return Update(&consensus_req, &consensus_resp).AndThen([&consensus_resp]{
return consensus_resp.has_error()
? StatusFromPB(consensus_resp.error().status()) : Status::OK();
});
}