Status RaftConsensus::BulkChangeConfig()

in src/kudu/consensus/raft_consensus.cc [1898:2086]


Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
                                       StatusCallback client_cb,
                                       optional<TabletServerErrorPB::Code>* error_code) {
  TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  {
    ThreadRestrictions::AssertWaitAllowed();
    std::lock_guard l(lock_);
    RETURN_NOT_OK(CheckRunningUnlocked());
    RETURN_NOT_OK(CheckActiveLeaderUnlocked());
    RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());

    // We are required by Raft to reject config change operations until we have
    // committed at least one operation in our current term as leader.
    // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
    if (!queue_->IsCommittedIndexInCurrentTerm()) {
      return Status::IllegalState("Leader has not yet committed an operation in its own term");
    }

    const RaftConfigPB committed_config = cmeta_->CommittedConfig();

    // Support atomic ChangeConfig requests.
    if (req.has_cas_config_opid_index()) {
      if (committed_config.opid_index() != req.cas_config_opid_index()) {
        *error_code = TabletServerErrorPB::CAS_FAILED;
        return Status::IllegalState(Substitute("Request specified cas_config_opid_index "
                                               "of $0 but the committed config has opid_index "
                                               "of $1",
                                               req.cas_config_opid_index(),
                                               committed_config.opid_index()));
      }
    }

    // 'new_config' will be modified in-place and validated before being used
    // as the new Raft configuration.
    RaftConfigPB new_config = committed_config;

    // Enforce the "one by one" config change rules, even with the bulk API.
    // Keep track of total voters added, including non-voters promoted to
    // voters, and removed, including voters demoted to non-voters.
    int num_voters_modified = 0;

    // A record of the peers being modified so that we can enforce only one
    // change per peer per request.
    unordered_set<string> peers_modified;

    for (const auto& item : req.config_changes()) {
      if (PREDICT_FALSE(!item.has_type())) {
        *error_code = TabletServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'type' argument",
                                       SecureShortDebugString(req));
      }
      if (PREDICT_FALSE(!item.has_peer())) {
        *error_code = TabletServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'peer' argument",
                                       SecureShortDebugString(req));
      }

      ChangeConfigType type = item.type();
      const RaftPeerPB& peer = item.peer();

      if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
        return Status::InvalidArgument("peer must have permanent_uuid specified",
                                       SecureShortDebugString(req));
      }

      if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
        return Status::InvalidArgument(
            Substitute("only one change allowed per peer: peer $0 appears more "
                       "than once in the config change request",
                       peer.permanent_uuid()),
            SecureShortDebugString(req));
      }

      const string& server_uuid = peer.permanent_uuid();
      switch (type) {
        case ADD_PEER:
          // Ensure the peer we are adding is not already a member of the configuration.
          if (IsRaftConfigMember(server_uuid, committed_config)) {
            return Status::InvalidArgument(
                Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (!peer.has_member_type()) {
            return Status::InvalidArgument("peer must have member_type specified",
                                           SecureShortDebugString(req));
          }
          if (!peer.has_last_known_addr()) {
            return Status::InvalidArgument("peer must have last_known_addr specified",
                                           SecureShortDebugString(req));
          }
          if (peer.member_type() == RaftPeerPB::VOTER) {
            num_voters_modified++;
          }
          *new_config.add_peers() = peer;
          break;

        case REMOVE_PEER:
          if (server_uuid == peer_uuid()) {
            return Status::InvalidArgument(
                Substitute("Cannot remove peer $0 from the config because it is the leader. "
                           "Force another leader to be elected to remove this peer. "
                           "Consensus state: $1",
                           server_uuid,
                           SecureShortDebugString(cmeta_->ToConsensusStatePB())));
          }
          if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
            return Status::NotFound(
                Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (IsRaftConfigVoter(server_uuid, committed_config)) {
            num_voters_modified++;
          }
          break;

        case MODIFY_PEER: {
          RaftPeerPB* modified_peer;
          RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer));
          const RaftPeerPB orig_peer(*modified_peer);
          // Override 'member_type' and items within 'attrs' only if they are
          // explicitly passed in the request. At least one field must be
          // modified to be a valid request.
          if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
            if (modified_peer->member_type() == RaftPeerPB::VOTER ||
                peer.member_type() == RaftPeerPB::VOTER) {
              // This is a 'member_type' change involving a VOTER, i.e. a
              // promotion or demotion.
              num_voters_modified++;
            }
            // A leader must be forced to step down before demoting it.
            if (server_uuid == peer_uuid()) {
              return Status::InvalidArgument(
                  Substitute("Cannot modify member type of peer $0 because it is the leader. "
                              "Cause another leader to be elected to modify this peer. "
                              "Consensus state: $1",
                              server_uuid,
                              SecureShortDebugString(cmeta_->ToConsensusStatePB())));
            }
            modified_peer->set_member_type(peer.member_type());
          }
          if (peer.attrs().has_promote()) {
            modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
          }
          if (peer.attrs().has_replace()) {
            modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
          }
          // Ensure that MODIFY_PEER actually modified something.
          if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
            return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
          }
          break;
        }

        default:
          return Status::NotSupported(Substitute(
              "$0: unsupported type of configuration change",
              ChangeConfigType_Name(type)));
      }
    }

    // Don't allow no-op config changes to be committed.
    if (MessageDifferencer::Equals(committed_config, new_config)) {
      return Status::InvalidArgument("requested configuration change does not "
                                     "actually modify the config",
                                     SecureShortDebugString(req));
    }

    // Ensure this wasn't an illegal bulk change.
    if (num_voters_modified > 1) {
      return Status::InvalidArgument("it is not safe to modify the VOTER status "
                                     "of more than one peer at a time",
                                     SecureShortDebugString(req));
    }

    // We'll assign a new opid_index to this config change.
    new_config.clear_opid_index();

    RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
        committed_config, std::move(new_config),
        [this, client_cb](const Status& s) {
          this->MarkDirtyOnSuccess("Config change replication complete",
                                   client_cb, s);
        }));
  } // Release lock before signaling request.
  peer_manager_->SignalRequest();
  return Status::OK();
}