in core/src/main/scala/kafka/log/LogValidator.scala [342:477]
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
topicPartition: TopicPartition,
offsetCounter: LongRef,
time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean,
toMagic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
"are not allowed to use ZStandard compression")
def validateRecordCompression(batchIndex: Int, record: Record): Option[ApiRecordError] = {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
Some(ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex,
s"Compressed outer record should not have an inner record with a compression attribute set: $record")))
else None
}
// No in place assignment situation 1
var inPlaceAssignment = sourceCodec == targetCodec
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
var uncompressedSizeInBytes = 0
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
// One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
// a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)
// No in place assignment situation 2 and 3: we only need to check for the first batch because:
// 1. For most cases (compressed records, v2, for example), there's only one batch anyways.
// 2. For cases that there may be multiple batches, all batches' magic should be the same.
if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
inPlaceAssignment = false
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
inPlaceAssignment = true
val batches = records.batches.asScala
for (batch <- batches) {
validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// if we are on version 2 and beyond, and we know we are going for in place assignment,
// then we can optimize the iterator to skip key / value / headers since they would not be used at all
val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
else
batch.streamingIterator(BufferSupplier.NO_CACHING)
try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
var batchIndex = 0
for (record <- recordsIterator.asScala) {
val expectedOffset = expectedInnerOffset.getAndIncrement()
val recordError = validateRecordCompression(batchIndex, record).orElse {
validateRecord(batch, topicPartition, record, batchIndex, now,
timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse {
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
// Some older clients do not implement the V1 internal offsets correctly.
// Historically the broker handled this by rewriting the batches rather
// than rejecting the request. We must continue this handling here to avoid
// breaking these clients.
if (record.offset != expectedOffset)
inPlaceAssignment = false
}
None
}
}
recordError match {
case Some(e) => recordErrors += e
case None =>
uncompressedSizeInBytes += record.sizeInBytes()
validatedRecords += record
}
batchIndex += 1
}
processRecordErrors(recordErrors)
} finally {
recordsIterator.close()
}
}
if (!inPlaceAssignment) {
val (producerId, producerEpoch, sequence, isTransactional) = {
// note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
// with older magic versions, there will never be a producer id, etc.
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
uncompressedSizeInBytes)
} else {
// we can update the batch only and write the compressed payload as is;
// again we assume only one record batch within the compressed set
val batch = records.batches.iterator.next()
val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
batch.setLastOffset(lastOffset)
if (timestampType == TimestampType.LOG_APPEND_TIME)
maxTimestamp = now
if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
batch.setMaxTimestamp(timestampType, maxTimestamp)
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
messageSizeMaybeChanged = false,
recordConversionStats = recordConversionStats)
}
}