in core/src/main/scala/kafka/server/KafkaApis.scala [3014:3189]
def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
val shareFetchRequest = request.body[ShareFetchRequest]
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return
}
val groupId = shareFetchRequest.data.groupId
// Share Fetch needs permission to perform the READ action on the named group resource (groupId)
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
return
}
val memberId = shareFetchRequest.data.memberId
val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
def isAcknowledgeDataPresentInFetchRequest: Boolean = {
shareFetchRequest.data.topics.asScala
.flatMap(t => t.partitions().asScala)
.exists(partition => partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty)
}
val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest
val topicIdNames = metadataCache.topicIdsToNames()
val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames)
val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames)
val newReqMetadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.fromString(memberId), shareSessionEpoch)
var shareFetchContext: ShareFetchContext = null
try {
// Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here.
shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, request.context.connectionId)
} catch {
case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
return
}
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions
val topicIdPartitionSeq: mutable.Set[TopicIdPartition] = mutable.Set()
erroneousAndValidPartitionData.erroneous.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
erroneousAndValidPartitionData.validTopicIdPartitions.forEach(tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp)
shareFetchData.forEach { tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp}
// Kafka share consumers need READ permission on each topic they are fetching.
val authorizedTopics = authHelper.filterByAuthorized(
request.context,
READ,
TOPIC,
topicIdPartitionSeq
)(_.topicPartition.topic)
// Variable to store the topic partition wise result of piggybacked acknowledgements.
var acknowledgeResult: CompletableFuture[Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]] =
CompletableFuture.completedFuture(mutable.Map.empty)
// Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an
// Initial ShareFetch Request, otherwise the request would have been invalid.
if (isAcknowledgeDataPresent) {
val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]()
val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, topicIdNames, erroneous)
acknowledgeResult = handleAcknowledgements(
acknowledgementDataFromRequest,
erroneous,
sharePartitionManager,
authorizedTopics,
groupId,
memberId,
)
}
// Handling the Fetch from the ShareFetchRequest.
// Variable to store the topic partition wise result of fetching.
val fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = handleFetchFromShareFetchRequest(
request,
shareSessionEpoch,
erroneousAndValidPartitionData,
sharePartitionManager,
authorizedTopics
)
def combineShareFetchAndShareAcknowledgeResponses(fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]],
acknowledgeResult: CompletableFuture[Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]],
): CompletableFuture[ShareFetchResponse] = {
fetchResult.thenCombine(acknowledgeResult,
(fetchMap: Map[TopicIdPartition, ShareFetchResponseData.PartitionData],
acknowledgeMap: Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]) => {
val shareFetchResponse = processShareFetchResponse(fetchMap, request, topicIdNames, shareFetchContext)
// The outer map has topicId as the key and the inner map has partitionIndex as the key.
val topicPartitionAcknowledgements: mutable.Map[Uuid, mutable.Map[Int, Short]] = mutable.Map()
if (acknowledgeMap != null && acknowledgeMap.nonEmpty) {
acknowledgeMap.foreach { case (tp, partitionData) =>
topicPartitionAcknowledgements.get(tp.topicId) match {
case Some(subMap) =>
subMap += tp.partition -> partitionData.errorCode
case None =>
val partitionAcknowledgementsMap: mutable.Map[Int, Short] = mutable.Map()
partitionAcknowledgementsMap += tp.partition -> partitionData.errorCode
topicPartitionAcknowledgements += tp.topicId -> partitionAcknowledgementsMap
}
}
}
shareFetchResponse.data.responses.forEach{ topic =>
val topicId = topic.topicId
topicPartitionAcknowledgements.get(topicId) match {
case Some(subMap) =>
topic.partitions.forEach { partition =>
subMap.get(partition.partitionIndex) match {
case Some(value) =>
partition.setAcknowledgeErrorCode(value)
// Delete the element.
subMap.remove(partition.partitionIndex)
case None =>
}
}
// Add the remaining acknowledgements.
subMap.foreach { case (partitionIndex, value) =>
val fetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.NONE.code)
.setAcknowledgeErrorCode(value)
.setRecords(MemoryRecords.EMPTY)
topic.partitions.add(fetchPartitionData)
}
topicPartitionAcknowledgements.remove(topicId)
case None =>
}
}
// Add the remaining acknowledgements.
topicPartitionAcknowledgements.foreach { case (topicId, subMap) =>
val topicData = new ShareFetchResponseData.ShareFetchableTopicResponse()
.setTopicId(topicId)
subMap.foreach { case (partitionIndex, value) =>
val fetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.NONE.code)
.setAcknowledgeErrorCode(value)
.setRecords(MemoryRecords.EMPTY)
topicData.partitions.add(fetchPartitionData)
}
shareFetchResponse.data.responses.add(topicData)
}
if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) {
sharePartitionManager.releaseSession(groupId, memberId).
whenComplete((releaseAcquiredRecordsData, throwable) =>
if (throwable != null) {
error(s"Releasing share session close with correlation from client ${request.header.clientId} " +
s"failed with error ${throwable.getMessage}")
} else {
info(s"Releasing share session close $releaseAcquiredRecordsData succeeded")
}
)
}
shareFetchResponse
})
}
// Send the response once the future completes.
combineShareFetchAndShareAcknowledgeResponses(fetchResult, acknowledgeResult).handle[Unit] {(result, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception))
} else {
requestChannel.sendResponse(request, result, None)
}
}
}