private static boolean onAppendEntriesReturned()

in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java [1392:1549]


    private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
        final AppendEntriesRequest request,
        final AppendEntriesResponse response, final long rpcSendTime,
        final long startTimeMs, final Replicator r) {
        if (inflight.startIndex != request.prevLogIndex() + 1) {
            LOG.warn(
                "Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
                r, inflight.startIndex, request.prevLogIndex());
            r.resetInflights();
            r.setState(State.Probe);
            // unlock id in sendEmptyEntries
            r.sendProbeRequest();
            return false;
        }
        // record metrics
        if (request.entriesList() != null) {
            r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
            r.nodeMetrics.recordSize("replicate-entries-count", request.entriesList().size());
            r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().capacity() : 0);
        }

        final boolean isLogDebugEnabled = LOG.isDebugEnabled();
        StringBuilder sb = null;
        if (isLogDebugEnabled) {
            sb = new StringBuilder("Node ") //
                .append(r.options.getGroupId()) //
                .append(':') //
                .append(r.options.getServerId()) //
                .append(" received AppendEntriesResponse from ") //
                .append(r.options.getPeerId()) //
                .append(" prevLogIndex=") //
                .append(request.prevLogIndex()) //
                .append(" prevLogTerm=") //
                .append(request.prevLogTerm()) //
                .append(" count=") //
                .append(Utils.size(request.entriesList()));
        }
        if (!status.isOk()) {
            // If the follower crashes, any RPC to the follower fails immediately,
            // so we need to block the follower for a while instead of looping until
            // it comes back or be removed
            // dummy_id is unlock in block
            if (isLogDebugEnabled) {
                sb.append(" fail, sleep, status=") //
                    .append(status);
                LOG.debug(sb.toString());
            }
            notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
            if (status.getRaftError() != RaftError.ESHUTDOWN && ++r.consecutiveErrorTimes % 10 == 0) {
                logFailToIssueRpc(status, r);
            }
            r.resetInflights();
            r.setState(State.Probe);
            // unlock in in block
            r.block(startTimeMs, status.getCode());
            return false;
        }
        r.consecutiveErrorTimes = 0;
        if (!response.success()) {
             // Target node is is busy, sleep for a while.
            if (response.errorCode() == RaftError.EBUSY.getNumber()) {
                if (isLogDebugEnabled) {
                    sb.append(" is busy, sleep, errorMsg='") //
                        .append(response.errorMsg()).append("'");
                    LOG.debug(sb.toString());
                }
                r.resetInflights();
                r.setState(State.Probe);
                // unlock in in block
                r.block(startTimeMs, status.getCode());
                return false;
            }

            if (response.term() > r.options.getTerm()) {
                if (isLogDebugEnabled) {
                    sb.append(" fail, greater term ") //
                        .append(response.term()) //
                        .append(" expect term ") //
                        .append(r.options.getTerm());
                    LOG.debug(sb.toString());
                }
                final NodeImpl node = r.options.getNode();
                r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
                r.destroy();
                node.increaseTermTo(response.term(), new Status(RaftError.EHIGHERTERMRESPONSE,
                    "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
                return false;
            }
            if (isLogDebugEnabled) {
                sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.lastLogIndex())
                    .append(" local nextIndex ").append(r.nextIndex);
                LOG.debug(sb.toString());
            }
            if (rpcSendTime > r.lastRpcSendTimestamp) {
                r.lastRpcSendTimestamp = rpcSendTime;
            }
            // Fail, reset the state to try again from nextIndex.
            r.resetInflights();
            // prev_log_index and prev_log_term doesn't match
            if (response.lastLogIndex() + 1 < r.nextIndex) {
                LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.lastLogIndex());
                // The peer contains less logs than leader
                r.nextIndex = response.lastLogIndex() + 1;
            }
            else {
                // The peer contains logs from old term which should be truncated,
                // decrease _last_log_at_peer by one to test the right index to keep
                if (r.nextIndex > 1) {
                    LOG.debug("logIndex={} dismatch", r.nextIndex);
                    r.nextIndex--;
                }
                else {
                    LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen",
                        r.options.getPeerId());
                }
            }
            // dummy_id is unlock in _send_heartbeat
            r.sendProbeRequest();
            return false;
        }
        if (isLogDebugEnabled) {
            sb.append(", success");
            LOG.debug(sb.toString());
        }
        // success
        if (response.term() != r.options.getTerm()) {
            r.resetInflights();
            r.setState(State.Probe);
            LOG.error("Fail, response term {} dismatch, expect term {}", response.term(), r.options.getTerm());
            id.unlock();
            return false;
        }
        if (rpcSendTime > r.lastRpcSendTimestamp) {
            r.lastRpcSendTimestamp = rpcSendTime;
        }
        final int entriesSize = Utils.size(request.entriesList());
        if (entriesSize > 0) {
            if (r.options.getReplicatorType().isFollower()) {
                // Only commit index when the response is from follower.
                r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1,
                    r.options.getPeerId());
            }
        }

        r.setState(State.Replicate);
        r.blockTimer = null;
        r.nextIndex += entriesSize;
        r.hasSucceeded = true;
        r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
        // dummy_id is unlock in _send_entries
        if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
            r.sendTimeoutNow(false, false);
        }
        return true;
    }