in core/src/main/scala/kafka/server/AbstractFetcherThread.scala [311:468]
private[server] def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
trace(s"Sending fetch request $fetchRequest")
responseData = leader.fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning) {
warn(s"Error in response for fetch request $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
}
}
}
fetcherStats.requestRate.mark()
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
responseData.foreachEntry { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if:
// - the partition state is ready for fetch
// - the current offset is the same as the offset requested
// - the current leader epoch is the same as the leader epoch requested
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null &&
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset &&
fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) &&
currentFetchState.isReadyForFetch) {
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
try {
if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
// If a diverging epoch is present, we truncate the log of the replica
// but we don't process the partition data in order to not update the
// low/high watermarks until the truncation is actually done. Those will
// be updated by the next fetch.
divergingEndOffsets += topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(partitionData.divergingEpoch.epoch)
.setEndOffset(partitionData.divergingEpoch.endOffset)
} else {
/* Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
*
* When appending batches to the log only append record batches up to the leader epoch when the FETCH
* request was handled. This is done to make sure that logs are not inconsistent because of log
* truncation and append after the FETCH request was handled. See KAFKA-18723 for more details.
*/
val logAppendInfoOpt = processPartitionData(
topicPartition,
currentFetchState.fetchOffset,
currentFetchState.currentLeaderEpoch,
partitionData
)
logAppendInfoOpt.foreach { logAppendInfo =>
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
val lastFetchedEpoch =
if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch else currentFetchState.lastFetchedEpoch
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
if (validBytes > 0) fetcherStats.byteRate.mark(validBytes)
}
}
}
} catch {
case ime@(_: CorruptRecordException | _: InvalidRecordException) =>
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
// down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
// can cause this), we simply continue and should get fixed in the subsequent fetches
error(s"Found invalid messages during fetch for partition $topicPartition " +
s"offset ${currentFetchState.fetchOffset}", ime)
partitionsWithError += topicPartition
case e: KafkaStorageException =>
error(s"Error while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", e)
markPartitionFailed(topicPartition)
case t: Throwable =>
// stop monitoring this partition and add it to the set of failed partitions
error(s"Unexpected error occurred while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", t)
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition
case Errors.UNKNOWN_LEADER_EPOCH =>
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition
case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch, partitionData))
partitionsWithError += topicPartition
case Errors.NOT_LEADER_OR_FOLLOWER =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
"that the partition is being moved")
partitionsWithError += topicPartition
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " +
"This error may be returned transiently when the partition is being created or deleted, but it is not " +
"expected to persist.")
partitionsWithError += topicPartition
case Errors.UNKNOWN_TOPIC_ID =>
warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader for partition $topicPartition. " +
"This error may be returned transiently when the partition is being created or deleted, but it is not " +
"expected to persist.")
partitionsWithError += topicPartition
case Errors.INCONSISTENT_TOPIC_ID =>
warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the leader for partition $topicPartition. " +
"This error may be returned transiently when the partition is being created or deleted, but it is not " +
"expected to persist.")
partitionsWithError += topicPartition
case partitionError =>
error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}", partitionError.exception)
partitionsWithError += topicPartition
}
}
}
}
}
}
if (divergingEndOffsets.nonEmpty)
truncateOnFetchResponse(divergingEndOffsets)
if (partitionsWithError.nonEmpty) {
handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
}
}