private LogAppendInfo append()

in fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java [606:703]


    private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
            throws Exception {
        LogAppendInfo appendInfo = analyzeAndValidateRecords(records);

        // return if we have no valid records.
        if (appendInfo.shallowCount() == 0) {
            return appendInfo;
        }

        // trim any invalid bytes or partial messages before appending it to the on-disk log.
        MemoryLogRecords validRecords = trimInvalidBytes(records, appendInfo);

        synchronized (lock) {
            localLog.checkIfMemoryMappedBufferClosed();
            if (appendAsLeader) {
                long offset = localLog.getLocalLogEndOffset();
                // assign offsets to the message set.
                appendInfo.setFirstOffset(offset);

                AssignResult result =
                        assignOffsetAndTimestamp(
                                validRecords,
                                offset,
                                Math.max(localLog.getLocalMaxTimestamp(), clock.milliseconds()));
                appendInfo.setLastOffset(result.lastOffset);
                appendInfo.setMaxTimestamp(result.maxTimestamp);
                appendInfo.setStartOffsetOfMaxTimestamp(result.startOffsetOfMaxTimestampMs);
            } else {
                if (!appendInfo.offsetsMonotonic()) {
                    throw new FlussRuntimeException("Out of order offsets found.");
                }
            }

            // maybe roll the log if this segment is full.
            maybeRoll(validRecords.sizeInBytes(), appendInfo);

            // now that we have valid records, offsets assigned, we need to validate the idempotent
            // state of the writers and collect some metadata.
            Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
                    analyzeAndValidateWriterState(validRecords);

            if (validateResult.isLeft()) {
                // have duplicated batch metadata, skip the append and update append info.
                WriterStateEntry.BatchMetadata duplicatedBatch = validateResult.left();
                long startOffset = duplicatedBatch.firstOffset();
                if (appendAsLeader) {
                    appendInfo.setFirstOffset(startOffset);
                    appendInfo.setLastOffset(duplicatedBatch.lastOffset);
                    appendInfo.setMaxTimestamp(duplicatedBatch.timestamp);
                    appendInfo.setStartOffsetOfMaxTimestamp(startOffset);
                    appendInfo.setDuplicated(true);
                } else {
                    String errorMsg =
                            String.format(
                                    "Found duplicated batch for table bucket %s, duplicated offset is %s, "
                                            + "writer id is %s and batch sequence is: %s",
                                    getTableBucket(),
                                    duplicatedBatch.lastOffset,
                                    duplicatedBatch.writerId,
                                    duplicatedBatch.batchSequence);
                    LOG.error(errorMsg);
                    throw new DuplicateSequenceException(errorMsg);
                }
            } else {
                // Append the records, and increment the local log end offset immediately after
                // append because write to the transaction index below may fail, and we want to
                // ensure that the offsets of future appends still grow monotonically.
                localLog.append(
                        appendInfo.lastOffset(),
                        appendInfo.maxTimestamp(),
                        appendInfo.startOffsetOfMaxTimestamp(),
                        validRecords);
                updateHighWatermarkWithLogEndOffset();

                // update the writer state.
                Collection<WriterAppendInfo> updatedWriters = validateResult.right();
                updatedWriters.forEach(writerStateManager::update);

                // always update the last writer id map offset so that the snapshot reflects
                // the current offset even if there isn't any idempotent data being written.
                writerStateManager.updateMapEndOffset(appendInfo.lastOffset() + 1);

                // todo update the first unstable offset (which is used to compute lso)

                LOG.trace(
                        "Appended message set with last offset: {}, first offset {}, next offset: {} and messages {}",
                        appendInfo.lastOffset(),
                        appendInfo.firstOffset(),
                        localLog.getLocalLogEndOffset(),
                        validRecords);

                if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
                    flush(false);
                }
            }
            return appendInfo;
        }
    }