int64_t PeerMessageQueue::DoComputeNewWatermarkStaticMode()

in src/kudu/consensus/consensus_queue.cc [1052:1207]


int64_t PeerMessageQueue::DoComputeNewWatermarkStaticMode(
    const std::map<std::string, int>& voter_distribution,
    const std::map<std::string, std::vector<int64_t> >& watermarks_by_region,
    int64_t* watermark) {
  CHECK(watermark);
  CHECK(queue_state_.active_config->has_commit_rule());
  CHECK(queue_state_.active_config->commit_rule().rule_predicates_size() > 0);

  const QuorumMode& mode = queue_state_.active_config->commit_rule().mode();
  CHECK(mode == QuorumMode::STATIC_DISJUNCTION ||
      mode == QuorumMode::STATIC_CONJUNCTION);
  VLOG_WITH_PREFIX_UNLOCKED(1) << "Computing new commit index in static "
                               << ((mode == QuorumMode::STATIC_DISJUNCTION) ?
                                  "disjunction" : "conjunction")
                               << " mode";
  const auto& rule_predicates =
      queue_state_.active_config->commit_rule().rule_predicates();

  // For each individual predicate, the commit index corresponding to that
  // predicate is appeneded to the following vector. For eg. if the commit
  // rule is defined as:
  // p1: majority in 1 out of 3 regions in {R1, R2, R3}  AND / OR
  // p2: majority in 3 out of 5 regions in {R4, R5, R6, R7, R8},
  // then the following vector would have at most two entries denoting
  // the commit index allowed by each predicate p1 & p2.
  std::vector<int64_t> predicate_commit_indexes;

  for (const CommitRulePredicatePB& rule_predicate : rule_predicates) {
    int regions_subset_size = rule_predicate.regions_subset_size();

    if (VLOG_IS_ON(3)) {
      VLOG_WITH_PREFIX_UNLOCKED(3)
          << "Computing commit index for a predicate with "
          << rule_predicate.regions_size()
          << ", Number of majority regions required : "
          << regions_subset_size;
    }

    // For each of the regions featuring in a predicate, the following vector
    // stores commit indexes corresponding to those regions.
    // Lets take p2: majority in 3 out of 5 regions in {R4, R5, R6, R7, R8}
    // from the example above. The vector will contain commit indexes
    // corresponding to each of the regions R4, R5, R6, R7 and R8.
    std::vector<int64_t> regional_commit_indexes;

    for (const std::string& region : rule_predicate.regions()) {
      int total_voters = FindOrDie(voter_distribution, region);
      int commit_req = MajoritySize(total_voters);
      std::map<std::string, std::vector<int64_t> >::const_iterator it =
          watermarks_by_region.find(region);

      // If we haven't got responses from enough number of servers in region,
      // we simply move on.
      if (it == watermarks_by_region.end() || it->second.size() < commit_req) {
        if (VLOG_IS_ON(3)) {
          VLOG_WITH_PREFIX_UNLOCKED(3)
              << "Skipping region: " << region
              << ", Majority size: " << commit_req
              << ", Servers responded: "
              << ((it == watermarks_by_region.end()) ? 0 : it->second.size());
        }
        continue;
      }

      const std::vector<int64_t>& watermarks_in_region = it->second;

      // Computing the commit index in each region.
      int64_t regional_commit_index = watermarks_in_region[
          watermarks_in_region.size() - commit_req];

      if (VLOG_IS_ON(3)) {
        VLOG_WITH_PREFIX_UNLOCKED(3)
            << "Watermarks in region: " << region;
        for (int64_t watermark_it : watermarks_in_region) {
          VLOG_WITH_PREFIX_UNLOCKED(3)
              << "Watermark: " << watermark_it;
        }
        VLOG_WITH_PREFIX_UNLOCKED(3)
            << "Regional commit index: " << regional_commit_index;
      }

      regional_commit_indexes.push_back(regional_commit_index);
    }

    // If we haven't got enough majorities in regions listed in the predicate,
    // we simply move on.
    if (regional_commit_indexes.size() < regions_subset_size) {
      if (VLOG_IS_ON(3)) {
        VLOG_WITH_PREFIX_UNLOCKED(3)
            << "Skipping predicate."
            << " Number of regions required: " << regions_subset_size
            << ", Number of regions responded: "
            << regional_commit_indexes.size();
      }
      continue;
    }

    // Computing the commit index as per the predicate.
    int64_t predicate_commit_index =
        regional_commit_indexes[regional_commit_indexes.size() -
            regions_subset_size];
    if (VLOG_IS_ON(3)) {
      VLOG_WITH_PREFIX_UNLOCKED(3)
          << "Watermarks in regions: ";
      for (int64_t watermark_it : regional_commit_indexes) {
        VLOG_WITH_PREFIX_UNLOCKED(3)
            << "Watermark: " << watermark_it;
      }
      VLOG_WITH_PREFIX_UNLOCKED(3)
          << "Predicate commit index: " << predicate_commit_index;
    }
    predicate_commit_indexes.push_back(predicate_commit_index);
  }

  int64_t old_watermark = *watermark;
  if (mode == QuorumMode::STATIC_DISJUNCTION) {
    // Maximum commit index is chosen from the predicate commit indexes
    // because of disjunction.
    const std::vector<int64_t>::const_iterator it =
        std::max_element(predicate_commit_indexes.begin(),
                         predicate_commit_indexes.end());
    // Checking the possibility that predicate_commit_indexes
    // can be empty in case we didn't get majorities from enough
    // regions for any of the predicates.
    if (it != predicate_commit_indexes.end()) {
      *watermark = *it;
    } else if (VLOG_IS_ON(3)) {
       VLOG_WITH_PREFIX_UNLOCKED(3)
            << "None of the predicates have got enough majorities.";
    }
  }

  if (mode == QuorumMode::STATIC_CONJUNCTION) {
    // We only compute the new commit index if the all of the predicates have
    // contributed to the predicate_commit_indexes vector with their individual
    // commit_index. We cannot afford to overlook any single predicate in
    // the conjunctive mode.
    if (predicate_commit_indexes.size() == rule_predicates.size()) {
      // Minimum commit index is chosen from the predicate commit indexes
      // because of conjunction.
      const std::vector<int64_t>::const_iterator it =
          std::min_element(predicate_commit_indexes.begin(),
                           predicate_commit_indexes.end());
      CHECK(it != predicate_commit_indexes.end());
      *watermark = *it;
    } else if (VLOG_IS_ON(3)) {
      VLOG_WITH_PREFIX_UNLOCKED(3)
            << "At least one of the predicates hasn't got enough majorities."
            << " Number of predicates: " << rule_predicates.size()
            << " Number of predicates with enough majorities: "
            << predicate_commit_indexes.size();
    }
  }

  return old_watermark;
}