public Message handleAppendEntriesRequest()

in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java [2144:2329]


    public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
        boolean doUnlock = true;
        final long startMs = Utils.monotonicMs();
        this.writeLock.lock();
        final int entriesCount = Utils.size(request.entriesList());
        boolean success = false;
        try {
            if (!this.state.isActive()) {
                LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
                return RaftRpcFactory.DEFAULT //
                    .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
                        "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
            }

            final PeerId serverId = new PeerId();
            if (!serverId.parse(request.serverId())) {
                LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
                    request.serverId());
                return RaftRpcFactory.DEFAULT //
                    .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
                        "Parse serverId failed: %s.", request.serverId());
            }

            // Check stale term
            if (request.term() < this.currTerm) {
                LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(),
                    request.serverId(), request.term(), this.currTerm);
                AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
                        .appendEntriesResponse()
                        .success(false)
                        .term(this.currTerm);

                if (request.timestamp() != null) {
                    rb.timestampLong(clock.update(request.timestamp()).longValue());
                }

                return rb.build();
            }

            // Check term and state to step down
            checkStepDown(request.term(), serverId);
            if (!serverId.equals(this.leaderId)) {
                LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
                    serverId, this.currTerm, this.leaderId);
                // Increase the term by 1 and make both leaders step down to minimize the
                // loss of split brain
                stepDown(request.term() + 1, false, new Status(RaftError.ELEADERCONFLICT,
                    "More than one leader in the same term."));
                AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
                        .appendEntriesResponse()
                        .success(false) //
                        .term(request.term() + 1);

                if (request.timestamp() != null) {
                    rb.timestampLong(clock.update(request.timestamp()).longValue());
                }

                return rb.build();
            }

            updateLastLeaderTimestamp(Utils.monotonicMs());

            if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
                LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
                return RaftRpcFactory.DEFAULT //
                    .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EBUSY,
                        "Node %s:%s is installing snapshot.", this.groupId, this.serverId);
            }

            final long prevLogIndex = request.prevLogIndex();
            final long prevLogTerm = request.prevLogTerm();
            final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
            if (localPrevLogTerm != prevLogTerm) {
                final long lastLogIndex = this.logManager.getLastLogIndex();

                LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, " +
                        "prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
                    getNodeId(), request.serverId(), request.term(), prevLogIndex, prevLogTerm, localPrevLogTerm,
                    lastLogIndex, entriesCount);

                AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
                        .appendEntriesResponse()
                        .success(false)
                        .term(this.currTerm)
                        .lastLogIndex(lastLogIndex);

                if (request.timestamp() != null) {
                    rb.timestampLong(clock.update(request.timestamp()).longValue());
                }

                return rb.build();
            }

            if (entriesCount == 0) {
                // heartbeat or probe request
                final AppendEntriesResponseBuilder respBuilder = raftOptions.getRaftMessagesFactory()
                    .appendEntriesResponse()
                    .success(true)
                    .term(this.currTerm)
                    .lastLogIndex(this.logManager.getLastLogIndex());
                if (request.timestamp() != null) {
                    respBuilder.timestampLong(clock.update(request.timestamp()).longValue());
                }
                doUnlock = false;
                this.writeLock.unlock();
                // see the comments at FollowerStableClosure#run()
                this.ballotBox.setLastCommittedIndex(Math.min(request.committedIndex(), prevLogIndex));
                return respBuilder.build();
            }

            // fast checking if log manager is overloaded
            if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
                LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", getNodeId());

                AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
                        .appendEntriesResponse()
                        .success(false)
                        .errorCode(RaftError.EBUSY.getNumber())
                        .errorMsg(String.format("Node %s:%s log manager is busy.", this.groupId, this.serverId))
                        .term(this.currTerm);

                if (request.timestamp() != null) {
                    rb.timestampLong(clock.update(request.timestamp()).longValue());
                }

                return rb.build();
            }

            // Parse request
            long index = prevLogIndex;
            final List<LogEntry> entries = new ArrayList<>(entriesCount);
            ByteBuffer allData = request.data() != null ? request.data().asReadOnlyByteBuffer() : ByteString.EMPTY.asReadOnlyByteBuffer();

            final Collection<RaftOutter.EntryMeta> entriesList = request.entriesList();
            for (RaftOutter.EntryMeta entry : entriesList) {
                index++;

                final LogEntry logEntry = logEntryFromMeta(index, allData, entry);

                if (logEntry != null) {
                    // Validate checksum
                    if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
                        long realChecksum = logEntry.checksum();
                        LOG.error(
                            "Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, realChecksum={}",
                            logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                            realChecksum);
                        return RaftRpcFactory.DEFAULT //
                            .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL,
                                "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
                                logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                                realChecksum);
                    }
                    entries.add(logEntry);
                }
            }

            final FollowerStableClosure closure = new FollowerStableClosure(
                request,
                raftOptions.getRaftMessagesFactory().appendEntriesResponse().term(this.currTerm),
                this,
                done,
                this.currTerm
            );
            this.logManager.appendEntries(entries, closure);
            // update configuration after _log_manager updated its memory status
            checkAndSetConfiguration(true);
            success = true;
            return null;
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
            final long processLatency = Utils.monotonicMs() - startMs;
            if (entriesCount == 0) {
                this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
            } else {
                this.metrics.recordLatency("handle-append-entries", processLatency);
            }
            if (success) {
                // Don't stats heartbeat requests.
                this.metrics.recordSize("handle-append-entries-count", entriesCount);
            }
        }
    }