Status RaftConsensus::CheckBulkConfigChangeAndGetNewConfigUnlocked()

in src/kudu/consensus/raft_consensus.cc [2231:2465]


Status RaftConsensus::CheckBulkConfigChangeAndGetNewConfigUnlocked(
      const BulkChangeConfigRequestPB& req,
      boost::optional<ServerErrorPB::Code>* error_code,
      RaftConfigPB *new_config) {
  {
    DCHECK(lock_.is_locked());
    RETURN_NOT_OK(CheckRunningUnlocked());
    RETURN_NOT_OK(CheckActiveLeaderUnlocked());
    RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());

    // We are required by Raft to reject config change operations until we have
    // committed at least one operation in our current term as leader.
    // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
    if (!queue_->IsCommittedIndexInCurrentTerm()) {
      return Status::IllegalState("Leader has not yet committed an operation in its own term");
    }

    const RaftConfigPB committed_config = cmeta_->CommittedConfig();

    // Support atomic ChangeConfig requests.
    if (req.has_cas_config_opid_index()) {
      if (committed_config.opid_index() != req.cas_config_opid_index()) {
        *error_code = ServerErrorPB::CAS_FAILED;
        return Status::IllegalState(Substitute("Request specified cas_config_opid_index "
                                               "of $0 but the committed config has opid_index "
                                               "of $1",
                                               req.cas_config_opid_index(),
                                               committed_config.opid_index()));
      }
    }

    // 'new_config' will be modified in-place and validated before being used
    // as the new Raft configuration.
    *new_config = committed_config;

    // Enforce the "one by one" config change rules, even with the bulk API.
    // Keep track of total voters added, including non-voters promoted to
    // voters, and removed, including voters demoted to non-voters.
    int num_voters_modified = 0;

    // A record of the peers being modified so that we can enforce only one
    // change per peer per request.
    unordered_set<string> peers_modified;

    for (const auto& item : req.config_changes()) {
      if (PREDICT_FALSE(!item.has_type())) {
        *error_code = ServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'type' argument",
                                       SecureShortDebugString(req));
      }
      if (PREDICT_FALSE(!item.has_peer())) {
        *error_code = ServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'peer' argument",
                                       SecureShortDebugString(req));
      }

      ChangeConfigType type = item.type();
      const RaftPeerPB& peer = item.peer();

      if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
        return Status::InvalidArgument("peer must have permanent_uuid specified",
                                       SecureShortDebugString(req));
      }

      if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
        return Status::InvalidArgument(
            Substitute("only one change allowed per peer: peer $0 appears more "
                       "than once in the config change request",
                       peer.permanent_uuid()),
            SecureShortDebugString(req));
      }

      const string& server_uuid = peer.permanent_uuid();
      switch (type) {
        case ADD_PEER:
          // Ensure the peer we are adding is not already a member of the configuration.
          if (IsRaftConfigMember(server_uuid, committed_config)) {
            return Status::InvalidArgument(
                Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (!peer.has_member_type()) {
            return Status::InvalidArgument("peer must have member_type specified",
                                           SecureShortDebugString(req));
          }
          if (!peer.has_last_known_addr()) {
            return Status::InvalidArgument("peer must have last_known_addr specified",
                                           SecureShortDebugString(req));
          }
          if (peer.member_type() == RaftPeerPB::VOTER) {
            num_voters_modified++;
          }
          *(new_config->add_peers()) = peer;
          break;

        case REMOVE_PEER:
          if (server_uuid == peer_uuid()) {
            return Status::InvalidArgument(
                Substitute("Cannot remove peer $0 from the config because it is the leader. "
                           "Force another leader to be elected to remove this peer. "
                           "Consensus state: $1",
                           server_uuid,
                           SecureShortDebugString(cmeta_->ToConsensusStatePB())));
          }
          if (!RemoveFromRaftConfig(new_config, server_uuid)) {
            return Status::NotFound(
                Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (IsRaftConfigVoter(server_uuid, committed_config)) {
            num_voters_modified++;

            // If we are in flexi-raft mode, we want to make sure that the number of
            // voters does not dip below quorum requirements/min-rep-factor
            // So if we have 6 voters in LEADER region and min-replication-factor/
            // quorum = Majority(6) = 4, then in healthy state we are expecting 6 voters.
            // We can safely remove 2 voters and we will still have 4 voters, having enough
            // for write-availability, but we can't remove another one as that will make #voters=3
            // which will prevent commit with a MIN-REP-FACTOR=4
            // We are also currently only enforcing this requirement in current LEADER region
            // for single region dynami mode. In SRD mode we allow other regions to go below this
            // requirement, because it gives flexibility to automation to replace nodes without
            // impacting the write availability.
            if (FLAGS_enable_flexi_raft) {
              const auto& vd_map = committed_config.voter_distribution();

              std::map<std::string, int> voters_in_config_per_region;
              std::string unused_leader_region;
              std::string unused_leader_uuid;
              // Get number of voters in each region
              GetRegionalCountsFromConfig(
                  committed_config, unused_leader_uuid, &voters_in_config_per_region,
                  &unused_leader_region);

              // single region dynamic mode.
              bool srd_mode = committed_config.has_commit_rule() &&
                  (committed_config.commit_rule().mode() == QuorumMode::SINGLE_REGION_DYNAMIC);
              for (const RaftPeerPB& peer : committed_config.peers()) {
                if (peer.permanent_uuid() != server_uuid) {
                  continue;
                }

                // Zeroed in on the peer we are about to remove.
                const std::string& region = peer.attrs().region();

                // In SINGLE REGION DYANMIC mode, we only do this extra check
                // in current LEADER region. the local peer is the LEADER
                // because of CheckActiveLeaderUnlocked above
                if (srd_mode && region != peer_region()) {
                  break;
                }
                int current_count = voters_in_config_per_region[region];
                // reduce count by 1
                int future_count = current_count - 1;
                auto vd_itr = vd_map.find(region);
                if (vd_itr != vd_map.end()) {
                  int expected_voters = (*vd_itr).second;
                  int quorum = MajoritySize(expected_voters);
                  if (future_count < quorum) {
                    return Status::InvalidArgument(strings::Substitute("Cannot remove a voter in region: $0"
                        " which will make future voter count: $1 dip below expected voters: $2",
                        region, future_count, quorum));
                  }
                }
                break;
              }
            }
          }
          break;

        case MODIFY_PEER: {
          LOG(INFO) << "modifying peer" << peer.ShortDebugString();
          RaftPeerPB* modified_peer;
          RETURN_NOT_OK(GetRaftConfigMember(new_config, server_uuid, &modified_peer));
          const RaftPeerPB orig_peer(*modified_peer);
          // Override 'member_type' and items within 'attrs' only if they are
          // explicitly passed in the request. At least one field must be
          // modified to be a valid request.
          if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
            if (modified_peer->member_type() == RaftPeerPB::VOTER ||
                peer.member_type() == RaftPeerPB::VOTER) {
              // This is a 'member_type' change involving a VOTER, i.e. a
              // promotion or demotion.
              num_voters_modified++;
            }
            // A leader must be forced to step down before demoting it.
            if (server_uuid == peer_uuid()) {
              return Status::InvalidArgument(
                  Substitute("Cannot modify member type of peer $0 because it is the leader. "
                              "Cause another leader to be elected to modify this peer. "
                              "Consensus state: $1",
                              server_uuid,
                              SecureShortDebugString(cmeta_->ToConsensusStatePB())));
            }
            modified_peer->set_member_type(peer.member_type());
          }
          if (peer.attrs().has_promote()) {
            modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
          }
          if (peer.attrs().has_replace()) {
            modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
          }
          // Ensure that MODIFY_PEER actually modified something.
          if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
            return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
          }
          break;
        }

        default:
          return Status::NotSupported(Substitute(
              "$0: unsupported type of configuration change",
              ChangeConfigType_Name(type)));
      }
    }

    // Don't allow no-op config changes to be committed.
    if (MessageDifferencer::Equals(committed_config, *new_config)) {
      return Status::InvalidArgument("requested configuration change does not "
                                     "actually modify the config",
                                     SecureShortDebugString(req));
    }

    // Ensure this wasn't an illegal bulk change.
    if (num_voters_modified > 1) {
      return Status::InvalidArgument("it is not safe to modify the VOTER status "
                                     "of more than one peer at a time",
                                     SecureShortDebugString(req));
    }

    // We'll assign a new opid_index to this config change.
    new_config->clear_opid_index();
  }
  return Status::OK();
}