in src/kudu/consensus/consensus_queue.cc [1052:1207]
int64_t PeerMessageQueue::DoComputeNewWatermarkStaticMode(
const std::map<std::string, int>& voter_distribution,
const std::map<std::string, std::vector<int64_t> >& watermarks_by_region,
int64_t* watermark) {
CHECK(watermark);
CHECK(queue_state_.active_config->has_commit_rule());
CHECK(queue_state_.active_config->commit_rule().rule_predicates_size() > 0);
const QuorumMode& mode = queue_state_.active_config->commit_rule().mode();
CHECK(mode == QuorumMode::STATIC_DISJUNCTION ||
mode == QuorumMode::STATIC_CONJUNCTION);
VLOG_WITH_PREFIX_UNLOCKED(1) << "Computing new commit index in static "
<< ((mode == QuorumMode::STATIC_DISJUNCTION) ?
"disjunction" : "conjunction")
<< " mode";
const auto& rule_predicates =
queue_state_.active_config->commit_rule().rule_predicates();
// For each individual predicate, the commit index corresponding to that
// predicate is appeneded to the following vector. For eg. if the commit
// rule is defined as:
// p1: majority in 1 out of 3 regions in {R1, R2, R3} AND / OR
// p2: majority in 3 out of 5 regions in {R4, R5, R6, R7, R8},
// then the following vector would have at most two entries denoting
// the commit index allowed by each predicate p1 & p2.
std::vector<int64_t> predicate_commit_indexes;
for (const CommitRulePredicatePB& rule_predicate : rule_predicates) {
int regions_subset_size = rule_predicate.regions_subset_size();
if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Computing commit index for a predicate with "
<< rule_predicate.regions_size()
<< ", Number of majority regions required : "
<< regions_subset_size;
}
// For each of the regions featuring in a predicate, the following vector
// stores commit indexes corresponding to those regions.
// Lets take p2: majority in 3 out of 5 regions in {R4, R5, R6, R7, R8}
// from the example above. The vector will contain commit indexes
// corresponding to each of the regions R4, R5, R6, R7 and R8.
std::vector<int64_t> regional_commit_indexes;
for (const std::string& region : rule_predicate.regions()) {
int total_voters = FindOrDie(voter_distribution, region);
int commit_req = MajoritySize(total_voters);
std::map<std::string, std::vector<int64_t> >::const_iterator it =
watermarks_by_region.find(region);
// If we haven't got responses from enough number of servers in region,
// we simply move on.
if (it == watermarks_by_region.end() || it->second.size() < commit_req) {
if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Skipping region: " << region
<< ", Majority size: " << commit_req
<< ", Servers responded: "
<< ((it == watermarks_by_region.end()) ? 0 : it->second.size());
}
continue;
}
const std::vector<int64_t>& watermarks_in_region = it->second;
// Computing the commit index in each region.
int64_t regional_commit_index = watermarks_in_region[
watermarks_in_region.size() - commit_req];
if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Watermarks in region: " << region;
for (int64_t watermark_it : watermarks_in_region) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Watermark: " << watermark_it;
}
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Regional commit index: " << regional_commit_index;
}
regional_commit_indexes.push_back(regional_commit_index);
}
// If we haven't got enough majorities in regions listed in the predicate,
// we simply move on.
if (regional_commit_indexes.size() < regions_subset_size) {
if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Skipping predicate."
<< " Number of regions required: " << regions_subset_size
<< ", Number of regions responded: "
<< regional_commit_indexes.size();
}
continue;
}
// Computing the commit index as per the predicate.
int64_t predicate_commit_index =
regional_commit_indexes[regional_commit_indexes.size() -
regions_subset_size];
if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Watermarks in regions: ";
for (int64_t watermark_it : regional_commit_indexes) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Watermark: " << watermark_it;
}
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "Predicate commit index: " << predicate_commit_index;
}
predicate_commit_indexes.push_back(predicate_commit_index);
}
int64_t old_watermark = *watermark;
if (mode == QuorumMode::STATIC_DISJUNCTION) {
// Maximum commit index is chosen from the predicate commit indexes
// because of disjunction.
const std::vector<int64_t>::const_iterator it =
std::max_element(predicate_commit_indexes.begin(),
predicate_commit_indexes.end());
// Checking the possibility that predicate_commit_indexes
// can be empty in case we didn't get majorities from enough
// regions for any of the predicates.
if (it != predicate_commit_indexes.end()) {
*watermark = *it;
} else if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "None of the predicates have got enough majorities.";
}
}
if (mode == QuorumMode::STATIC_CONJUNCTION) {
// We only compute the new commit index if the all of the predicates have
// contributed to the predicate_commit_indexes vector with their individual
// commit_index. We cannot afford to overlook any single predicate in
// the conjunctive mode.
if (predicate_commit_indexes.size() == rule_predicates.size()) {
// Minimum commit index is chosen from the predicate commit indexes
// because of conjunction.
const std::vector<int64_t>::const_iterator it =
std::min_element(predicate_commit_indexes.begin(),
predicate_commit_indexes.end());
CHECK(it != predicate_commit_indexes.end());
*watermark = *it;
} else if (VLOG_IS_ON(3)) {
VLOG_WITH_PREFIX_UNLOCKED(3)
<< "At least one of the predicates hasn't got enough majorities."
<< " Number of predicates: " << rule_predicates.size()
<< " Number of predicates with enough majorities: "
<< predicate_commit_indexes.size();
}
}
return old_watermark;
}