in core/src/main/scala/kafka/server/ReplicaManager.scala [1068:1194]
def readFromLocalLog(replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
val traceEnabled = isTraceEnabled
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
if (traceEnabled)
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
val partition = getPartitionOrException(tp)
val fetchTimeMs = time.milliseconds
// If we are the leader, determine the preferred read-replica
val preferredReadReplica = clientMetadata.flatMap(
metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for $clientMetadata")
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
} else {
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo: LogReadInfo = partition.readRecords(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
// If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
LogReadResult(info = fetchDataInfo,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
}
} catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderOrFollowerException |
_: UnknownLeaderEpochException |
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
_: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
followerLogStartOffset = Log.UnknownOffset,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
case e: Throwable =>
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
val fetchSource = Request.describeReplicaId(replicaId)
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
followerLogStartOffset = Log.UnknownOffset,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
}
}
var limitBytes = fetchMaxBytes
val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
var minOneMessage = !hardMaxBytesLimit
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val recordBatchSize = readResult.info.records.sizeInBytes
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
limitBytes = math.max(0, limitBytes - recordBatchSize)
result += (tp -> readResult)
}
result
}