private ResultAndTerm waitForResults()

in ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java [382:468]


  private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
      RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
    final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
    final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
    final List<Exception> exceptions = new ArrayList<>();
    int waitForNum = submitted;
    Collection<RaftPeerId> votedPeers = new ArrayList<>();
    Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
    Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
    final boolean singleMode = conf.isSingleMode(server.getId());

    while (waitForNum > 0 && shouldRun(electionTerm)) {
      final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
      if (waitTime.isNonPositive()) {
        if (conf.hasMajority(votedPeers, server.getId())) {
          // if some higher priority peer did not response when timeout, but candidate get majority,
          // candidate pass vote
          return logAndReturn(phase, Result.PASSED, responses, exceptions);
        } else if (singleMode) {
          // if candidate is in single mode, candidate pass vote.
          return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
        } else {
          return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
        }
      }

      try {
        final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
        if (future == null) {
          continue; // poll timeout, continue to return Result.TIMEOUT
        }

        final RequestVoteReplyProto r = future.get();
        final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
        final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
        if (previous != null) {
          if (LOG.isWarnEnabled()) {
            LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st={}, 2nd={}",
                this, replierId,
                ServerStringUtils.toRequestVoteReplyString(previous),
                ServerStringUtils.toRequestVoteReplyString(r));
          }
          continue;
        }
        if (r.getShouldShutdown()) {
          return logAndReturn(phase, Result.SHUTDOWN, responses, exceptions);
        }
        if (r.getTerm() > electionTerm) {
          return logAndReturn(phase, Result.DISCOVERED_A_NEW_TERM, responses, exceptions, r.getTerm());
        }

        // If any peer with higher priority rejects vote, candidate can not pass vote
        if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) {
          return logAndReturn(phase, Result.REJECTED, responses, exceptions);
        }

        // remove higher priority peer, so that we check higherPriorityPeers empty to make sure
        // all higher priority peers have replied
        higherPriorityPeers.remove(replierId);

        if (r.getServerReply().getSuccess()) {
          votedPeers.add(replierId);
          // If majority and all peers with higher priority have voted, candidate pass vote
          if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, server.getId())) {
            return logAndReturn(phase, Result.PASSED, responses, exceptions);
          }
        } else {
          rejectedPeers.add(replierId);
          if (conf.majorityRejectVotes(rejectedPeers)) {
            return logAndReturn(phase, Result.REJECTED, responses, exceptions);
          }
        }
      } catch(ExecutionException e) {
        LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
        exceptions.add(e);
      }
      waitForNum--;
    }
    // received all the responses
    if (conf.hasMajority(votedPeers, server.getId())) {
      return logAndReturn(phase, Result.PASSED, responses, exceptions);
    } else if (singleMode) {
      return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
    } else {
      return logAndReturn(phase, Result.REJECTED, responses, exceptions);
    }
  }