in zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java [545:858]
protected void syncWithLeader(long newLeaderZxid) throws Exception {
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
snapshotNeeded = true;
syncSnapshot = true;
} else {
snapshotNeeded = false;
}
} else if (qp.getType() == Leader.SNAP) {
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
// inconsistency of config node content during rolling restart.
if (!self.isReconfigEnabled()) {
LOG.debug("Reset config node content from local config after deserialization of snapshot.");
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
}
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got {}", signature);
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
// immediately persist the latest snapshot when there is txn log gap
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
TxnLogEntry logEntry;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
readPacket(qp);
switch (qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
logEntry = SerializeUtils.deserializeTxn(qp.getData());
pif.hdr = logEntry.getHeader();
pif.rec = logEntry.getTxn();
pif.digest = logEntry.getDigest();
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(pif.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
if (pif.hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
self.setLastSeenQuorumVerifier(qv, true);
}
packetsNotLogged.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotLogged.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotLogged.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
packet.hdr = logEntry.getHeader();
packet.rec = logEntry.getTxn();
packet.digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
} else {
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packet.rec = logEntry.getTxn();
packet.hdr = logEntry.getHeader();
packet.digest = logEntry.getDigest();
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(packet.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
}
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
if (newLeaderQV != null) {
boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (isPreZAB1_0) {
zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData() != null && qp.getData().length > 1) {
try {
QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
self.setLastSeenQuorumVerifier(qv, true);
newLeaderQV = qv;
} catch (Exception e) {
e.printStackTrace();
}
}
if (snapshotNeeded) {
zk.takeSnapshot(syncSnapshot);
}
writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
/*
* @see https://github.com/apache/zookeeper/pull/1848
* Persist and process the committed txns in "packetsNotLogged"
* according to "packetsCommitted", which have been committed by
* the leader. For these committed proposals, there is no need to
* reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotLogged" to avoid
* NullPointerException when the follower receives COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotLogged.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
} else if (pif.hdr.getZxid() != zxid) {
LOG.warn("Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotLogged.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.processTxn(pif.hdr, pif.rec);
}
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+ "{} outstanding txns left in packetsNotLogged",
Time.currentElapsedTime() - startTime, packetsNotLogged.size());
}
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785
// Update current epoch after the committed txns are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
// send NEWLEADER ack after the committed txns are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(newEpoch);
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
LOG.info("{} txns have been committed", packetsCommitted.size());
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
// old leader send outstanding proposal to observer
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid),
Long.toHexString(p.hdr.getZxid()));
continue;
}
packetsCommitted.remove();
Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
request.setTxnDigest(p.digest);
ozk.commitRequest(request);
}
} else {
// New server type need to handle in-flight packets
throw new UnsupportedOperationException("Unknown server type");
}
}