def validateMessagesAndAssignOffsetsCompressed()

in core/src/main/scala/kafka/log/LogValidator.scala [342:477]


  def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
                                                 topicPartition: TopicPartition,
                                                 offsetCounter: LongRef,
                                                 time: Time,
                                                 now: Long,
                                                 sourceCodec: CompressionCodec,
                                                 targetCodec: CompressionCodec,
                                                 compactedTopic: Boolean,
                                                 toMagic: Byte,
                                                 timestampType: TimestampType,
                                                 timestampDiffMaxMs: Long,
                                                 partitionLeaderEpoch: Int,
                                                 origin: AppendOrigin,
                                                 interBrokerProtocolVersion: ApiVersion,
                                                 brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {

    if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
      throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
        "are not allowed to use ZStandard compression")

    def validateRecordCompression(batchIndex: Int, record: Record): Option[ApiRecordError] = {
      if (sourceCodec != NoCompressionCodec && record.isCompressed)
        Some(ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex,
          s"Compressed outer record should not have an inner record with a compression attribute set: $record")))
      else None
    }

    // No in place assignment situation 1
    var inPlaceAssignment = sourceCodec == targetCodec

    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    val expectedInnerOffset = new LongRef(0)
    val validatedRecords = new mutable.ArrayBuffer[Record]

    var uncompressedSizeInBytes = 0

    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
    // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
    // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
    val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)

    // No in place assignment situation 2 and 3: we only need to check for the first batch because:
    //  1. For most cases (compressed records, v2, for example), there's only one batch anyways.
    //  2. For cases that there may be multiple batches, all batches' magic should be the same.
    if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
      inPlaceAssignment = false

    // Do not compress control records unless they are written compressed
    if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
      inPlaceAssignment = true

    val batches = records.batches.asScala
    for (batch <- batches) {
      validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
      uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())

      // if we are on version 2 and beyond, and we know we are going for in place assignment,
      // then we can optimize the iterator to skip key / value / headers since they would not be used at all
      val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
        batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
      else
        batch.streamingIterator(BufferSupplier.NO_CACHING)

      try {
        val recordErrors = new ArrayBuffer[ApiRecordError](0)
        var batchIndex = 0
        for (record <- recordsIterator.asScala) {
          val expectedOffset = expectedInnerOffset.getAndIncrement()
          val recordError = validateRecordCompression(batchIndex, record).orElse {
            validateRecord(batch, topicPartition, record, batchIndex, now,
              timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse {
              if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
                if (record.timestamp > maxTimestamp)
                  maxTimestamp = record.timestamp

                // Some older clients do not implement the V1 internal offsets correctly.
                // Historically the broker handled this by rewriting the batches rather
                // than rejecting the request. We must continue this handling here to avoid
                // breaking these clients.
                if (record.offset != expectedOffset)
                  inPlaceAssignment = false
              }
              None
            }
          }

          recordError match {
            case Some(e) => recordErrors += e
            case None =>
              uncompressedSizeInBytes += record.sizeInBytes()
              validatedRecords += record
          }
         batchIndex += 1
        }
        processRecordErrors(recordErrors)
      } finally {
        recordsIterator.close()
      }
    }

    if (!inPlaceAssignment) {
      val (producerId, producerEpoch, sequence, isTransactional) = {
        // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
        // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
        // with older magic versions, there will never be a producer id, etc.
        val first = records.batches.asScala.head
        (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
      }
      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
        now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
        uncompressedSizeInBytes)
    } else {
      // we can update the batch only and write the compressed payload as is;
      // again we assume only one record batch within the compressed set
      val batch = records.batches.iterator.next()
      val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1

      batch.setLastOffset(lastOffset)

      if (timestampType == TimestampType.LOG_APPEND_TIME)
        maxTimestamp = now

      if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
        batch.setMaxTimestamp(timestampType, maxTimestamp)

      if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
        batch.setPartitionLeaderEpoch(partitionLeaderEpoch)

      val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
      ValidationAndOffsetAssignResult(validatedRecords = records,
        maxTimestamp = maxTimestamp,
        shallowOffsetOfMaxTimestamp = lastOffset,
        messageSizeMaybeChanged = false,
        recordConversionStats = recordConversionStats)
    }
  }