in core/src/main/scala/kafka/server/KafkaApis.scala [623:875]
def handleFetchRequest(request: RequestChannel.Request): Unit = {
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
val fetchContext = fetchManager.newContext(
fetchRequest.metadata,
fetchRequest.fetchData,
fetchRequest.toForget,
fetchRequest.isFromFollower)
val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
Some(new DefaultClientMetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
None
}
def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
}
val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
fetchContext.foreachPartition { (topicPartition, data) =>
if (!metadataCache.contains(topicPartition))
erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
}
} else {
fetchContext.foreachPartition { (part, _) =>
erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
val partitionMap = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
fetchContext.foreachPartition { (topicPartition, partitionData) =>
partitionMap += topicPartition -> partitionData
}
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionMap)(_._1.topic)
partitionMap.foreach { case (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
}
}
def maybeDownConvertStorageError(error: Errors, version: Short): Errors = {
// If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
// UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
// by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5
if (error == Errors.KAFKA_STORAGE_ERROR && versionId <= 5) {
Errors.NOT_LEADER_OR_FOLLOWER
} else {
error
}
}
def maybeConvertFetchedData(tp: TopicPartition,
partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
val logConfig = replicaManager.getLogConfig(tp)
if (logConfig.exists(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) {
trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.")
errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
} else {
// Down-conversion of the fetched records is needed when the stored magic version is
// greater than that supported by the client (as indicated by the fetch request version). If the
// configured magic version for the topic is less than or equal to that supported by the version of the
// fetch request, we skip the iteration through the records in order to check the magic version since we
// know it must be supported. However, if the magic version is changed from a higher version back to a
// lower version, this check will no longer be valid and we will fail to down-convert the messages
// which were written in the new format prior to the version downgrade.
val unconvertedRecords = partitionData.records
val downConvertMagic =
logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
Some(RecordBatch.MAGIC_VALUE_V0)
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
Some(RecordBatch.MAGIC_VALUE_V1)
else
None
}
downConvertMagic match {
case Some(magic) =>
// For fetch requests from clients, check if down-conversion is disabled for the particular partition
if (!fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.")
errorResponse(Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
// client.
val error = maybeDownConvertStorageError(partitionData.error, versionId)
new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
} catch {
case e: UnsupportedCompressionTypeException =>
trace("Received unsupported compression type error during down-conversion", e)
errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None => {
val error = maybeDownConvertStorageError(partitionData.error, versionId)
new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
unconvertedRecords)
}
}
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
val reassigningPartitions = mutable.Set[TopicPartition]()
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
if (data.isReassignmentFetch)
reassigningPartitions.add(tp)
val error = maybeDownConvertStorageError(data.error, versionId)
partitions.put(tp, new FetchResponse.PartitionData(error, data.highWatermark, lastStableOffset,
data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions, data.records))
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
var unconvertedFetchResponse: FetchResponse[Records] = null
def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
// Down-convert messages for each partition if required
val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
if (unconvertedPartitionData.error != Errors.NONE)
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
}
// Prepare fetch response from converted data
val response = new FetchResponse(unconvertedFetchResponse.error, convertedData, throttleTimeMs,
unconvertedFetchResponse.sessionId)
// record the bytes out metrics only when the response is being sent
response.responseData.forEach { (tp, data) =>
brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), data.records.sizeInBytes)
}
response
}
def updateConversionStats(send: Send): Unit = {
send match {
case send: MultiRecordsSend if send.recordConversionStats != null =>
send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp, stats)
}
case _ =>
}
}
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responseSize = sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
quotas.leader.record(responseSize)
trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " +
s"metadata=${unconvertedFetchResponse.sessionId}")
sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
} else {
// Fetch size used to determine throttle time is calculated before any down conversions.
// This may be slightly different from the actual response size. But since down conversions
// result in data being loaded into memory, we should do this only when we are not going to throttle.
//
// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
// quotas have been violated. If both quotas have been violated, use the max throttle time between the two
// quotas. When throttled, we unrecord the recorded bandwidth quota value
val responseSize = fetchContext.getResponseSize(partitions, versionId)
val timeMs = time.milliseconds()
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
quotas.fetch.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse)
} else {
quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse)
}
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
}
// Send the response immediately.
sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
}
}
// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
// no bytes were recorded in the recent quota window
// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
Int.MaxValue
else
quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
if (interesting.isEmpty)
processResponseCallback(Seq.empty)
else {
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchMinBytes,
fetchMaxBytes,
versionId <= 2,
interesting,
replicationQuota(fetchRequest),
processResponseCallback,
fetchRequest.isolationLevel,
clientMetadata)
}
}