in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java [1387:1546]
private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
final AppendEntriesRequest request,
final AppendEntriesResponse response, final long rpcSendTime,
final long startTimeMs, final Replicator r) {
if (inflight.startIndex != request.prevLogIndex() + 1) {
LOG.warn(
"Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.prevLogIndex());
r.resetInflights();
r.setState(State.Probe);
// unlock id in sendEmptyEntries
r.sendProbeRequest();
return false;
}
// record metrics
if (request.entriesList() != null) {
r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
r.nodeMetrics.recordSize("replicate-entries-count", request.entriesList().size());
r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().size()
: 0);
}
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Node ") //
.append(r.options.getGroupId()) //
.append(':') //
.append(r.options.getServerId()) //
.append(" received AppendEntriesResponse from ") //
.append(r.options.getPeerId()) //
.append(" prevLogIndex=") //
.append(request.prevLogIndex()) //
.append(" prevLogTerm=") //
.append(request.prevLogTerm()) //
.append(" count=") //
.append(Utils.size(request.entriesList()));
}
if (!status.isOk()) {
// If the follower crashes, any RPC to the follower fails immediately,
// so we need to block the follower for a while instead of looping until
// it comes back or be removed
// dummy_id is unlock in block
if (isLogDebugEnabled) {
sb.append(" fail, sleep, status=") //
.append(status);
LOG.debug(sb.toString());
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}
r.consecutiveErrorTimes = 0;
if (!response.success()) {
// Target node is is busy, sleep for a while.
if (response.errorCode() == RaftError.EBUSY.getNumber()) {
if (isLogDebugEnabled) {
sb.append(" is busy, sleep, errorMsg='") //
.append(response.errorMsg()).append("'");
LOG.debug(sb.toString());
}
r.resetInflights();
r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}
if (response.term() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ") //
.append(response.term()) //
.append(" expect term ") //
.append(r.options.getTerm());
LOG.debug(sb.toString());
}
final NodeImpl node = r.options.getNode();
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.term(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return false;
}
if (isLogDebugEnabled) {
sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.lastLogIndex())
.append(" local nextIndex ").append(r.nextIndex);
LOG.debug(sb.toString());
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// Fail, reset the state to try again from nextIndex.
r.resetInflights();
// prev_log_index and prev_log_term doesn't match
if (response.lastLogIndex() + 1 < r.nextIndex) {
LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.lastLogIndex());
// The peer contains less logs than leader
r.nextIndex = response.lastLogIndex() + 1;
}
else {
// The peer contains logs from old term which should be truncated,
// decrease _last_log_at_peer by one to test the right index to keep
if (r.nextIndex > 1) {
LOG.debug("logIndex={} dismatch", r.nextIndex);
r.nextIndex--;
}
else {
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen",
r.options.getPeerId());
}
}
// dummy_id is unlock in _send_heartbeat
r.sendProbeRequest();
return false;
}
if (isLogDebugEnabled) {
sb.append(", success");
LOG.debug(sb.toString());
}
// success
if (response.term() != r.options.getTerm()) {
r.resetInflights();
r.setState(State.Probe);
LOG.error("Fail, response term {} dismatch, expect term {}", response.term(), r.options.getTerm());
id.unlock();
return false;
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
final int entriesSize = Utils.size(request.entriesList());
if (entriesSize > 0) {
if (r.options.getReplicatorType().isFollower()) {
// Only commit index when the response is from follower.
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1,
r.options.getPeerId());
}
}
r.setState(State.Replicate);
r.blockTimer = null;
r.nextIndex += entriesSize;
r.hasSucceeded = true;
r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
// dummy_id is unlock in _send_entries
if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
r.sendTimeoutNow(false, false);
}
return true;
}