private[server] def processFetchRequest()

in core/src/main/scala/kafka/server/AbstractFetcherThread.scala [311:468]


  private[server] def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
                                  fetchRequest: FetchRequest.Builder): Unit = {
    val partitionsWithError = mutable.Set[TopicPartition]()
    val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
    var responseData: Map[TopicPartition, FetchData] = Map.empty

    try {
      trace(s"Sending fetch request $fetchRequest")
      responseData = leader.fetch(fetchRequest)
    } catch {
      case t: Throwable =>
        if (isRunning) {
          warn(s"Error in response for fetch request $fetchRequest", t)
          inLock(partitionMapLock) {
            partitionsWithError ++= partitionStates.partitionSet.asScala
          }
        }
    }
    fetcherStats.requestRate.mark()

    if (responseData.nonEmpty) {
      // process fetched data
      inLock(partitionMapLock) {
        responseData.foreachEntry { (topicPartition, partitionData) =>
          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
            // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
            // In this case, we only want to process the fetch response if:
            // - the partition state is ready for fetch
            // - the current offset is the same as the offset requested
            // - the current leader epoch is the same as the leader epoch requested
            val fetchPartitionData = sessionPartitions.get(topicPartition)
            if (fetchPartitionData != null &&
                fetchPartitionData.fetchOffset == currentFetchState.fetchOffset &&
                fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) &&
                currentFetchState.isReadyForFetch) {
              Errors.forCode(partitionData.errorCode) match {
                case Errors.NONE =>
                  try {
                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
                      // If a diverging epoch is present, we truncate the log of the replica
                      // but we don't process the partition data in order to not update the
                      // low/high watermarks until the truncation is actually done. Those will
                      // be updated by the next fetch.
                      divergingEndOffsets += topicPartition -> new EpochEndOffset()
                        .setPartition(topicPartition.partition)
                        .setErrorCode(Errors.NONE.code)
                        .setLeaderEpoch(partitionData.divergingEpoch.epoch)
                        .setEndOffset(partitionData.divergingEpoch.endOffset)
                    } else {
                      /* Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                       *
                       * When appending batches to the log only append record batches up to the leader epoch when the FETCH
                       * request was handled. This is done to make sure that logs are not inconsistent because of log
                       * truncation and append after the FETCH request was handled. See KAFKA-18723 for more details.
                       */
                      val logAppendInfoOpt = processPartitionData(
                        topicPartition,
                        currentFetchState.fetchOffset,
                        currentFetchState.currentLeaderEpoch,
                        partitionData
                      )

                      logAppendInfoOpt.foreach { logAppendInfo =>
                        val validBytes = logAppendInfo.validBytes
                        val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
                        val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
                        fetcherLagStats.getAndMaybePut(topicPartition).lag = lag

                        // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
                        if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
                          val lastFetchedEpoch =
                            if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch else currentFetchState.lastFetchedEpoch
                          // Update partitionStates only if there is no exception during processPartitionData
                          val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
                            currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)
                          partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
                          if (validBytes > 0) fetcherStats.byteRate.mark(validBytes)
                        }
                      }
                    }
                  } catch {
                    case ime@(_: CorruptRecordException | _: InvalidRecordException) =>
                      // we log the error and continue. This ensures two things
                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
                      //    down and cause other topic partition to also lag
                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
                      //    can cause this), we simply continue and should get fixed in the subsequent fetches
                      error(s"Found invalid messages during fetch for partition $topicPartition " +
                        s"offset ${currentFetchState.fetchOffset}", ime)
                      partitionsWithError += topicPartition
                    case e: KafkaStorageException =>
                      error(s"Error while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", e)
                      markPartitionFailed(topicPartition)
                    case t: Throwable =>
                      // stop monitoring this partition and add it to the set of failed partitions
                      error(s"Unexpected error occurred while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", t)
                      markPartitionFailed(topicPartition)
                  }
                case Errors.OFFSET_OUT_OF_RANGE =>
                  if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                    partitionsWithError += topicPartition

                case Errors.UNKNOWN_LEADER_EPOCH =>
                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
                    s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
                  partitionsWithError += topicPartition

                case Errors.FENCED_LEADER_EPOCH =>
                  if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch))
                    partitionsWithError += topicPartition

                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch, partitionData))
                    partitionsWithError += topicPartition

                case Errors.NOT_LEADER_OR_FOLLOWER =>
                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
                    "that the partition is being moved")
                  partitionsWithError += topicPartition

                case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
                  warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " +
                    "This error may be returned transiently when the partition is being created or deleted, but it is not " +
                    "expected to persist.")
                  partitionsWithError += topicPartition

                case Errors.UNKNOWN_TOPIC_ID =>
                  warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader for partition $topicPartition. " +
                    "This error may be returned transiently when the partition is being created or deleted, but it is not " +
                    "expected to persist.")
                  partitionsWithError += topicPartition

                case Errors.INCONSISTENT_TOPIC_ID =>
                  warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the leader for partition $topicPartition. " +
                    "This error may be returned transiently when the partition is being created or deleted, but it is not " +
                    "expected to persist.")
                  partitionsWithError += topicPartition

                case partitionError =>
                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}", partitionError.exception)
                  partitionsWithError += topicPartition
              }
            }
          }
        }
      }
    }

    if (divergingEndOffsets.nonEmpty)
      truncateOnFetchResponse(divergingEndOffsets)
    if (partitionsWithError.nonEmpty) {
      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
    }
  }