in zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java [463:740]
public void run() {
try {
learnerMaster.addLearnerHandler(this);
tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
return;
}
if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
}
byte[] learnerInfoData = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = learnerMaster.getAndDecrementFollowerCounter();
}
String followerInfo = learnerMaster.getPeerInfo(this.sid);
if (followerInfo.isEmpty()) {
LOG.info(
"Follower sid: {} not in the current config {}",
this.sid,
Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
} else {
LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
learnerMaster.registerLearnerHandlerBean(this, sock);
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
learnerMaster.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
// syncs between followers and the leader are exempt from throttling because it
// is important to keep the state of quorum servers up-to-date. The exempted syncs
// are counted as concurrent syncs though
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
try {
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();
LOG.info(
"Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+ "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
syncThrottler.getSyncInProgress(),
exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
LOG.debug("Sending NEWLEADER message to {}", sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
// Start thread that blast packets in the queue to learner
startSendingPackets();
/*
* Have to wait for the first ACK, wait until
* the learnerMaster is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
}
LOG.debug("Received NEWLEADER-ACK message from {}", sid);
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
// sync ends when NEWLEADER-ACK is received
syncThrottler.endSync();
if (needSnap) {
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
} else {
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
}
syncThrottler = null;
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout());
/*
* Wait until learnerMaster starts up
*/
learnerMaster.waitForStartup();
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to {}", sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (LOG.isTraceEnabled()) {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
packetsReceived.incrementAndGet();
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
learnerMaster.touch(sess, to);
}
break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
LOG.error("Unexpected exception in LearnerHandler: ", e);
closeSocket();
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
LOG.error("too many concurrent sync.", e);
syncThrottler = null;
} catch (Exception e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
throw e;
} finally {
if (syncThrottler != null) {
syncThrottler.endSync();
syncThrottler = null;
}
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE sid:{} {} ********", getSid(), remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
}