in ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java [382:468]
private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());
while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
if (conf.hasMajority(votedPeers, server.getId())) {
// if some higher priority peer did not response when timeout, but candidate get majority,
// candidate pass vote
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
// if candidate is in single mode, candidate pass vote.
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
}
}
try {
final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
if (future == null) {
continue; // poll timeout, continue to return Result.TIMEOUT
}
final RequestVoteReplyProto r = future.get();
final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
if (previous != null) {
if (LOG.isWarnEnabled()) {
LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st={}, 2nd={}",
this, replierId,
ServerStringUtils.toRequestVoteReplyString(previous),
ServerStringUtils.toRequestVoteReplyString(r));
}
continue;
}
if (r.getShouldShutdown()) {
return logAndReturn(phase, Result.SHUTDOWN, responses, exceptions);
}
if (r.getTerm() > electionTerm) {
return logAndReturn(phase, Result.DISCOVERED_A_NEW_TERM, responses, exceptions, r.getTerm());
}
// If any peer with higher priority rejects vote, candidate can not pass vote
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
// remove higher priority peer, so that we check higherPriorityPeers empty to make sure
// all higher priority peers have replied
higherPriorityPeers.remove(replierId);
if (r.getServerReply().getSuccess()) {
votedPeers.add(replierId);
// If majority and all peers with higher priority have voted, candidate pass vote
if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(phase, Result.PASSED, responses, exceptions);
}
} else {
rejectedPeers.add(replierId);
if (conf.majorityRejectVotes(rejectedPeers)) {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
} catch(ExecutionException e) {
LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
exceptions.add(e);
}
waitForNum--;
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}