in core/src/main/scala/kafka/server/KafkaApis.scala [553:766]
def handleFetchRequest(request: RequestChannel.Request): Unit = {
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
val topicNames =
if (fetchRequest.version() >= 13)
metadataCache.topicIdsToNames()
else
Collections.emptyMap[Uuid, String]()
val fetchData = fetchRequest.fetchData(topicNames)
val forgottenTopics = fetchRequest.forgottenTopics(topicNames)
val fetchContext = fetchManager.newContext(
fetchRequest.version,
fetchRequest.metadata,
fetchRequest.isFromFollower,
fetchData,
forgottenTopics,
topicNames)
val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()
val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
fetchContext.foreachPartition { (topicIdPartition, data) =>
if (topicIdPartition.topic == null)
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += topicIdPartition -> data
}
} else {
fetchContext.foreachPartition { (topicIdPartition, _) =>
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]
fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
if (topicIdPartition.topic == null)
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
else
partitionDatas += topicIdPartition -> partitionData
}
val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)
partitionDatas.foreach { case (topicIdPartition, data) =>
if (!authorizedTopics.contains(topicIdPartition.topic))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += topicIdPartition -> data
}
}
def maybeDownConvertStorageError(error: Errors): Errors = {
// If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
// UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
// by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if FetchRequest version <= 5
if (error == Errors.KAFKA_STORAGE_ERROR && versionId <= 5) {
Errors.NOT_LEADER_OR_FOLLOWER
} else {
error
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicIdPartition]()
val nodeEndpoints = new mutable.HashMap[Int, Node]
responsePartitionData.foreach { case (topicIdPartition, data) =>
val abortedTransactions = data.abortedTransactions.orElse(null)
val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
if (data.isReassignmentFetch) reassigningPartitions.add(topicIdPartition)
val partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition)
.setErrorCode(maybeDownConvertStorageError(data.error).code)
.setHighWatermark(data.highWatermark)
.setLastStableOffset(lastStableOffset)
.setLogStartOffset(data.logStartOffset)
.setAbortedTransactions(abortedTransactions)
.setRecords(data.records)
.setPreferredReadReplica(data.preferredReadReplica.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
if (versionId >= 16) {
data.error match {
case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH =>
val leaderNode = getCurrentLeader(topicIdPartition.topicPartition(), request.context.listenerName)
leaderNode.node.foreach { node =>
nodeEndpoints.put(node.id(), node)
}
partitionData.currentLeader()
.setLeaderId(leaderNode.leaderId)
.setLeaderEpoch(leaderNode.leaderEpoch)
case _ =>
}
}
data.divergingEpoch.ifPresent(epoch => partitionData.setDivergingEpoch(epoch))
partitions.put(topicIdPartition, partitionData)
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
def recordBytesOutMetric(fetchResponse: FetchResponse): Unit = {
// record the bytes out metrics only when the response is being sent
fetchResponse.data.responses.forEach { topicResponse =>
topicResponse.partitions.forEach { data =>
// If the topic name was not known, we will have no bytes out.
if (topicResponse.topic != null) {
val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic, data.partitionIndex))
brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data))
}
}
}
}
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
val fetchResponse = fetchContext.updateAndGenerateResponseData(partitions, Seq.empty.asJava)
val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, fetchResponse, quotas.leader)
quotas.leader.record(responseSize)
val responsePartitionsSize = fetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${fetchResponse.sessionId}")
recordBytesOutMetric(fetchResponse)
requestHelper.sendResponseExemptThrottle(request, fetchResponse)
} else {
// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
// quotas have been violated. If both quotas have been violated, use the max throttle time between the two
// quotas. When throttled, we unrecord the recorded bandwidth quota value.
val responseSize = fetchContext.getResponseSize(partitions, versionId)
val timeMs = time.milliseconds()
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
val fetchResponse = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
requestHelper.throttle(quotas.fetch, request, bandwidthThrottleTimeMs)
} else {
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
}
// If throttling is required, return an empty response.
fetchContext.getThrottledResponse(maxThrottleTimeMs, nodeEndpoints.values.toSeq.asJava)
} else {
// Get the actual response. This will update the fetch context.
val fetchResponse = fetchContext.updateAndGenerateResponseData(partitions, nodeEndpoints.values.toSeq.asJava)
val responsePartitionsSize = fetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${fetchResponse.sessionId}")
fetchResponse
}
recordBytesOutMetric(fetchResponse)
// Send the response immediately.
requestChannel.sendResponse(request, fetchResponse, None)
}
}
if (interesting.isEmpty) {
processResponseCallback(Seq.empty)
} else {
// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
// no bytes were recorded in the recent quota window
// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
Int.MaxValue
else
quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
Optional.of(new DefaultClientMetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
Optional.empty()
}
val params = new FetchParams(
fetchRequest.replicaId,
fetchRequest.replicaEpoch,
fetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
FetchIsolation.of(fetchRequest),
clientMetadata
)
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
params = params,
fetchInfos = interesting,
quota = replicationQuota(fetchRequest),
responseCallback = processResponseCallback,
)
}
}