in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java [2144:2329]
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
boolean doUnlock = true;
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
final int entriesCount = Utils.size(request.entriesList());
boolean success = false;
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RaftRpcFactory.DEFAULT //
.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
final PeerId serverId = new PeerId();
if (!serverId.parse(request.serverId())) {
LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
request.serverId());
return RaftRpcFactory.DEFAULT //
.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
"Parse serverId failed: %s.", request.serverId());
}
// Check stale term
if (request.term() < this.currTerm) {
LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(),
request.serverId(), request.term(), this.currTerm);
AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
.appendEntriesResponse()
.success(false)
.term(this.currTerm);
if (request.timestamp() != null) {
rb.timestampLong(clock.update(request.timestamp()).longValue());
}
return rb.build();
}
// Check term and state to step down
checkStepDown(request.term(), serverId);
if (!serverId.equals(this.leaderId)) {
LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
serverId, this.currTerm, this.leaderId);
// Increase the term by 1 and make both leaders step down to minimize the
// loss of split brain
stepDown(request.term() + 1, false, new Status(RaftError.ELEADERCONFLICT,
"More than one leader in the same term."));
AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
.appendEntriesResponse()
.success(false) //
.term(request.term() + 1);
if (request.timestamp() != null) {
rb.timestampLong(clock.update(request.timestamp()).longValue());
}
return rb.build();
}
updateLastLeaderTimestamp(Utils.monotonicMs());
if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
return RaftRpcFactory.DEFAULT //
.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EBUSY,
"Node %s:%s is installing snapshot.", this.groupId, this.serverId);
}
final long prevLogIndex = request.prevLogIndex();
final long prevLogTerm = request.prevLogTerm();
final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
if (localPrevLogTerm != prevLogTerm) {
final long lastLogIndex = this.logManager.getLastLogIndex();
LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, " +
"prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
getNodeId(), request.serverId(), request.term(), prevLogIndex, prevLogTerm, localPrevLogTerm,
lastLogIndex, entriesCount);
AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
.appendEntriesResponse()
.success(false)
.term(this.currTerm)
.lastLogIndex(lastLogIndex);
if (request.timestamp() != null) {
rb.timestampLong(clock.update(request.timestamp()).longValue());
}
return rb.build();
}
if (entriesCount == 0) {
// heartbeat or probe request
final AppendEntriesResponseBuilder respBuilder = raftOptions.getRaftMessagesFactory()
.appendEntriesResponse()
.success(true)
.term(this.currTerm)
.lastLogIndex(this.logManager.getLastLogIndex());
if (request.timestamp() != null) {
respBuilder.timestampLong(clock.update(request.timestamp()).longValue());
}
doUnlock = false;
this.writeLock.unlock();
// see the comments at FollowerStableClosure#run()
this.ballotBox.setLastCommittedIndex(Math.min(request.committedIndex(), prevLogIndex));
return respBuilder.build();
}
// fast checking if log manager is overloaded
if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", getNodeId());
AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
.appendEntriesResponse()
.success(false)
.errorCode(RaftError.EBUSY.getNumber())
.errorMsg(String.format("Node %s:%s log manager is busy.", this.groupId, this.serverId))
.term(this.currTerm);
if (request.timestamp() != null) {
rb.timestampLong(clock.update(request.timestamp()).longValue());
}
return rb.build();
}
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
ByteBuffer allData = request.data() != null ? request.data().asReadOnlyByteBuffer() : ByteString.EMPTY.asReadOnlyByteBuffer();
final Collection<RaftOutter.EntryMeta> entriesList = request.entriesList();
for (RaftOutter.EntryMeta entry : entriesList) {
index++;
final LogEntry logEntry = logEntryFromMeta(index, allData, entry);
if (logEntry != null) {
// Validate checksum
if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
long realChecksum = logEntry.checksum();
LOG.error(
"Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, realChecksum={}",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
return RaftRpcFactory.DEFAULT //
.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
}
entries.add(logEntry);
}
}
final FollowerStableClosure closure = new FollowerStableClosure(
request,
raftOptions.getRaftMessagesFactory().appendEntriesResponse().term(this.currTerm),
this,
done,
this.currTerm
);
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
checkAndSetConfiguration(true);
success = true;
return null;
}
finally {
if (doUnlock) {
this.writeLock.unlock();
}
final long processLatency = Utils.monotonicMs() - startMs;
if (entriesCount == 0) {
this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
} else {
this.metrics.recordLatency("handle-append-entries", processLatency);
}
if (success) {
// Don't stats heartbeat requests.
this.metrics.recordSize("handle-append-entries-count", entriesCount);
}
}
}