in core/src/main/scala/kafka/log/UnifiedLog.scala [738:912]
private def append(records: MemoryRecords,
origin: AppendOrigin,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
verificationGuard: VerificationGuard,
ignoreRecordSize: Boolean,
toMagic: Byte = RecordBatch.CURRENT_MAGIC_VALUE): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.validBytes <= 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())
val validator = new LogValidator(validRecords,
topicPartition,
time,
appendInfo.sourceCompression,
targetCompression,
config.compact,
toMagic,
config.messageTimestampType,
config.messageTimestampBeforeMaxMs,
config.messageTimestampAfterMaxMs,
leaderEpoch,
origin
)
validator.validateMessagesAndAssignOffsets(offset,
validatorMetricsRecorder,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")
).bufferSupplier
)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
validRecords.batches.forEach { batch =>
if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
// we are taking the offsets we are given
if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()
val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast " +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${localLog.logEndOffset}. " +
s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
firstOffset, appendInfo.lastOffset)
}
}
// update the epoch cache with the epoch stamped onto the message by the leader
validRecords.batches.forEach { batch =>
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
assignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
// In partial upgrade scenarios, we may get a temporary regression to the message format. In
// order to ensure the safety of leader election, we clear the epoch cache so that we revert
// to truncation by high watermark after the next leader election.
if (leaderEpochCache.nonEmpty) {
warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
leaderEpochCache.clearAndFlush()
}
}
}
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = new LogOffsetMetadata(
appendInfo.firstOrLastOffsetOfFirstBatch,
segment.baseOffset,
segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin, verificationGuard)
maybeDuplicate match {
case Some(duplicate) =>
appendInfo.setFirstOffset(duplicate.firstOffset)
appendInfo.setLastOffset(duplicate.lastOffset)
appendInfo.setLogAppendTime(duplicate.timestamp)
appendInfo.setLogStartOffset(logStartOffset)
case None =>
// Append the records, and increment the local log end offset immediately after the append because a
// write to the transaction index below may fail and we want to ensure that the offsets
// of future appends still grow monotonically. The resulting transaction index inconsistency
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, validRecords)
updateHighWatermarkWithLogEndOffset()
// update the producer state
updatedProducers.values.forEach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
completedTxns.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
maybeIncrementFirstUnstableOffset()
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${localLog.logEndOffset}, " +
s"and messages: $validRecords")
if (localLog.unflushedMessages >= config.flushInterval) flush(false)
}
appendInfo
}
}
}
}