in zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java [907:1138]
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*/
Map<Long, Vote> recvset = new HashMap<>();
/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
Map<Long, Vote> outofelection = new HashMap<>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getMyId(),
Long.toHexString(proposedZxid));
sendNotifications();
SyncedLearnerTracker voteSet = null;
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
/*
* When a leader failure happens on a master, the backup will be supposed to receive the honour from
* Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens
*
* The leader election algorithm does not provide the ability of electing a leader from a single instance
* which is in a configuration of 2 instances.
* */
if (self.getQuorumVerifier() instanceof QuorumOracleMaj
&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
LOG.info("Notification time out: {} ms", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
/*
* In ZOOKEEPER-3922, we separate the behaviors of FOLLOWING and LEADING.
* To avoid the duplication of codes, we create a method called followingBehavior which was used to
* shared by FOLLOWING and LEADING. This method returns a Vote. When the returned Vote is null, it follows
* the original idea to break switch statement; otherwise, a valid returned Vote indicates, a leader
* is generated.
*
* The reason why we need to separate these behaviors is to make the algorithm runnable for 2-node
* setting. An extra condition for generating leader is needed. Due to the majority rule, only when
* there is a majority in the voteset, a leader will be generated. However, in a configuration of 2 nodes,
* the number to achieve the majority remains 2, which means a recovered node cannot generate a leader which is
* the existed leader. Therefore, we need the Oracle to kick in this situation. In a two-node configuration, the Oracle
* only grants the permission to maintain the progress to one node. The oracle either grants the permission to the
* remained node and makes it a new leader when there is a faulty machine, which is the case to maintain the progress.
* Otherwise, the oracle does not grant the permission to the remained node, which further causes a service down.
*
* In the former case, when a failed server recovers and participate in the leader election, it would not locate a
* new leader because there does not exist a majority in the voteset. It fails on the containAllQuorum() infinitely due to
* two facts. First one is the fact that it does do not have a majority in the voteset. The other fact is the fact that
* the oracle would not give the permission since the oracle already gave the permission to the existed leader, the healthy machine.
* Logically, when the oracle replies with negative, it implies the existed leader which is LEADING notification comes from is a valid leader.
* To threat this negative replies as a permission to generate the leader is the purpose to separate these two behaviors.
*
*
* */
case FOLLOWING:
/*
* To avoid duplicate codes
* */
Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (resultFN == null) {
break;
} else {
return resultFN;
}
case LEADING:
/*
* In leadingBehavior(), it performs followingBehvior() first. When followingBehavior() returns
* a null pointer, ask Oracle whether to follow this leader.
* */
Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
if (resultLN == null) {
break;
} else {
return resultLN;
}
default:
LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}