in core/src/main/scala/kafka/server/DelayedFetch.scala [77:154]
override def tryComplete(): Boolean = {
var accumulatedSize = 0
fetchPartitionStatus.foreach {
case (topicIdPartition, fetchStatus) =>
val fetchOffset = fetchStatus.startOffsetMetadata
val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
try {
if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
val endOffset = params.isolation match {
case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark
case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset
}
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (fetchOffset.messageOffset > endOffset.messageOffset) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
if (fetchOffset.onOlderSegment(endOffset)) {
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $this immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
return forceComplete()
} else if (fetchOffset.onSameSegment(endOffset)) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
accumulatedSize += bytesAvailable
}
}
// Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
if (epochEndOffset.errorCode != Errors.NONE.code()
|| epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
|| epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.")
return forceComplete()
} else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " +
s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
return forceComplete()
}
}
}
} catch {
case _: NotLeaderOrFollowerException => // Case A or Case B
debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicIdPartition, satisfy $this immediately")
return forceComplete()
case _: KafkaStorageException => // Case D
debug(s"Partition $topicIdPartition is in an offline log directory, satisfy $this immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case E
debug(s"Broker is the leader of partition $topicIdPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately")
return forceComplete()
}
}
// Case G
if (accumulatedSize >= params.minBytes)
forceComplete()
else
false
}