in src/kudu/consensus/raft_consensus.cc [1898:2086]
Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
StatusCallback client_cb,
optional<TabletServerErrorPB::Code>* error_code) {
TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
{
ThreadRestrictions::AssertWaitAllowed();
std::lock_guard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(CheckActiveLeaderUnlocked());
RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
// We are required by Raft to reject config change operations until we have
// committed at least one operation in our current term as leader.
// See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
if (!queue_->IsCommittedIndexInCurrentTerm()) {
return Status::IllegalState("Leader has not yet committed an operation in its own term");
}
const RaftConfigPB committed_config = cmeta_->CommittedConfig();
// Support atomic ChangeConfig requests.
if (req.has_cas_config_opid_index()) {
if (committed_config.opid_index() != req.cas_config_opid_index()) {
*error_code = TabletServerErrorPB::CAS_FAILED;
return Status::IllegalState(Substitute("Request specified cas_config_opid_index "
"of $0 but the committed config has opid_index "
"of $1",
req.cas_config_opid_index(),
committed_config.opid_index()));
}
}
// 'new_config' will be modified in-place and validated before being used
// as the new Raft configuration.
RaftConfigPB new_config = committed_config;
// Enforce the "one by one" config change rules, even with the bulk API.
// Keep track of total voters added, including non-voters promoted to
// voters, and removed, including voters demoted to non-voters.
int num_voters_modified = 0;
// A record of the peers being modified so that we can enforce only one
// change per peer per request.
unordered_set<string> peers_modified;
for (const auto& item : req.config_changes()) {
if (PREDICT_FALSE(!item.has_type())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'type' argument",
SecureShortDebugString(req));
}
if (PREDICT_FALSE(!item.has_peer())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'peer' argument",
SecureShortDebugString(req));
}
ChangeConfigType type = item.type();
const RaftPeerPB& peer = item.peer();
if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
return Status::InvalidArgument("peer must have permanent_uuid specified",
SecureShortDebugString(req));
}
if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
return Status::InvalidArgument(
Substitute("only one change allowed per peer: peer $0 appears more "
"than once in the config change request",
peer.permanent_uuid()),
SecureShortDebugString(req));
}
const string& server_uuid = peer.permanent_uuid();
switch (type) {
case ADD_PEER:
// Ensure the peer we are adding is not already a member of the configuration.
if (IsRaftConfigMember(server_uuid, committed_config)) {
return Status::InvalidArgument(
Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
server_uuid, SecureShortDebugString(committed_config)));
}
if (!peer.has_member_type()) {
return Status::InvalidArgument("peer must have member_type specified",
SecureShortDebugString(req));
}
if (!peer.has_last_known_addr()) {
return Status::InvalidArgument("peer must have last_known_addr specified",
SecureShortDebugString(req));
}
if (peer.member_type() == RaftPeerPB::VOTER) {
num_voters_modified++;
}
*new_config.add_peers() = peer;
break;
case REMOVE_PEER:
if (server_uuid == peer_uuid()) {
return Status::InvalidArgument(
Substitute("Cannot remove peer $0 from the config because it is the leader. "
"Force another leader to be elected to remove this peer. "
"Consensus state: $1",
server_uuid,
SecureShortDebugString(cmeta_->ToConsensusStatePB())));
}
if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
return Status::NotFound(
Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
server_uuid, SecureShortDebugString(committed_config)));
}
if (IsRaftConfigVoter(server_uuid, committed_config)) {
num_voters_modified++;
}
break;
case MODIFY_PEER: {
RaftPeerPB* modified_peer;
RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer));
const RaftPeerPB orig_peer(*modified_peer);
// Override 'member_type' and items within 'attrs' only if they are
// explicitly passed in the request. At least one field must be
// modified to be a valid request.
if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
if (modified_peer->member_type() == RaftPeerPB::VOTER ||
peer.member_type() == RaftPeerPB::VOTER) {
// This is a 'member_type' change involving a VOTER, i.e. a
// promotion or demotion.
num_voters_modified++;
}
// A leader must be forced to step down before demoting it.
if (server_uuid == peer_uuid()) {
return Status::InvalidArgument(
Substitute("Cannot modify member type of peer $0 because it is the leader. "
"Cause another leader to be elected to modify this peer. "
"Consensus state: $1",
server_uuid,
SecureShortDebugString(cmeta_->ToConsensusStatePB())));
}
modified_peer->set_member_type(peer.member_type());
}
if (peer.attrs().has_promote()) {
modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
}
if (peer.attrs().has_replace()) {
modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
}
// Ensure that MODIFY_PEER actually modified something.
if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
}
break;
}
default:
return Status::NotSupported(Substitute(
"$0: unsupported type of configuration change",
ChangeConfigType_Name(type)));
}
}
// Don't allow no-op config changes to be committed.
if (MessageDifferencer::Equals(committed_config, new_config)) {
return Status::InvalidArgument("requested configuration change does not "
"actually modify the config",
SecureShortDebugString(req));
}
// Ensure this wasn't an illegal bulk change.
if (num_voters_modified > 1) {
return Status::InvalidArgument("it is not safe to modify the VOTER status "
"of more than one peer at a time",
SecureShortDebugString(req));
}
// We'll assign a new opid_index to this config change.
new_config.clear_opid_index();
RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
committed_config, std::move(new_config),
[this, client_cb](const Status& s) {
this->MarkDirtyOnSuccess("Config change replication complete",
client_cb, s);
}));
} // Release lock before signaling request.
peer_manager_->SignalRequest();
return Status::OK();
}