in fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java [606:703]
private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
throws Exception {
LogAppendInfo appendInfo = analyzeAndValidateRecords(records);
// return if we have no valid records.
if (appendInfo.shallowCount() == 0) {
return appendInfo;
}
// trim any invalid bytes or partial messages before appending it to the on-disk log.
MemoryLogRecords validRecords = trimInvalidBytes(records, appendInfo);
synchronized (lock) {
localLog.checkIfMemoryMappedBufferClosed();
if (appendAsLeader) {
long offset = localLog.getLocalLogEndOffset();
// assign offsets to the message set.
appendInfo.setFirstOffset(offset);
AssignResult result =
assignOffsetAndTimestamp(
validRecords,
offset,
Math.max(localLog.getLocalMaxTimestamp(), clock.milliseconds()));
appendInfo.setLastOffset(result.lastOffset);
appendInfo.setMaxTimestamp(result.maxTimestamp);
appendInfo.setStartOffsetOfMaxTimestamp(result.startOffsetOfMaxTimestampMs);
} else {
if (!appendInfo.offsetsMonotonic()) {
throw new FlussRuntimeException("Out of order offsets found.");
}
}
// maybe roll the log if this segment is full.
maybeRoll(validRecords.sizeInBytes(), appendInfo);
// now that we have valid records, offsets assigned, we need to validate the idempotent
// state of the writers and collect some metadata.
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
analyzeAndValidateWriterState(validRecords);
if (validateResult.isLeft()) {
// have duplicated batch metadata, skip the append and update append info.
WriterStateEntry.BatchMetadata duplicatedBatch = validateResult.left();
long startOffset = duplicatedBatch.firstOffset();
if (appendAsLeader) {
appendInfo.setFirstOffset(startOffset);
appendInfo.setLastOffset(duplicatedBatch.lastOffset);
appendInfo.setMaxTimestamp(duplicatedBatch.timestamp);
appendInfo.setStartOffsetOfMaxTimestamp(startOffset);
appendInfo.setDuplicated(true);
} else {
String errorMsg =
String.format(
"Found duplicated batch for table bucket %s, duplicated offset is %s, "
+ "writer id is %s and batch sequence is: %s",
getTableBucket(),
duplicatedBatch.lastOffset,
duplicatedBatch.writerId,
duplicatedBatch.batchSequence);
LOG.error(errorMsg);
throw new DuplicateSequenceException(errorMsg);
}
} else {
// Append the records, and increment the local log end offset immediately after
// append because write to the transaction index below may fail, and we want to
// ensure that the offsets of future appends still grow monotonically.
localLog.append(
appendInfo.lastOffset(),
appendInfo.maxTimestamp(),
appendInfo.startOffsetOfMaxTimestamp(),
validRecords);
updateHighWatermarkWithLogEndOffset();
// update the writer state.
Collection<WriterAppendInfo> updatedWriters = validateResult.right();
updatedWriters.forEach(writerStateManager::update);
// always update the last writer id map offset so that the snapshot reflects
// the current offset even if there isn't any idempotent data being written.
writerStateManager.updateMapEndOffset(appendInfo.lastOffset() + 1);
// todo update the first unstable offset (which is used to compute lso)
LOG.trace(
"Appended message set with last offset: {}, first offset {}, next offset: {} and messages {}",
appendInfo.lastOffset(),
appendInfo.firstOffset(),
localLog.getLocalLogEndOffset(),
validRecords);
if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
flush(false);
}
}
return appendInfo;
}
}