in core/src/main/scala/kafka/server/KafkaApis.scala [870:992]
def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
val metadataRequest = request.body[MetadataRequest]
val requestVersion = request.header.apiVersion
// Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions.
if (!metadataRequest.isAllTopics) {
metadataRequest.data.topics.forEach{ topic =>
if (topic.name == null && metadataRequest.version < 12) {
throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}")
} else if (topic.topicId != Uuid.ZERO_UUID && metadataRequest.version < 12) {
throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}")
}
}
}
// Check if topicId is presented firstly.
val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID)
val useTopicId = topicIds.nonEmpty
// Only get topicIds and topicNames when supporting topicId
val unknownTopicIds = topicIds.filter(metadataCache.getTopicName(_).isEmpty)
val knownTopicNames = topicIds.flatMap(id => OptionConverters.toScala(metadataCache.getTopicName(id)))
val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, isInternal = false, util.Collections.emptyList())).toSeq
val topics = if (metadataRequest.isAllTopics)
metadataCache.getAllTopics.asScala
else if (useTopicId)
knownTopicNames
else
metadataRequest.topics.asScala.toSet
val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
topics, logIfDenied = !metadataRequest.isAllTopics)(identity)
var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
var unauthorizedForCreateTopics = Set[String]()
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = authorizedTopics.filterNot(metadataCache.contains)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
nonExistingTopics)(identity)
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
}
}
}
val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
// Set topicId to zero since we will never create topic which topicId
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal(topic), util.Collections.emptyList()))
// do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include topics unauthorized for Describe
if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
Set.empty[MetadataResponseTopic]
else if (useTopicId) {
// Topic IDs are not considered sensitive information, so returning TOPIC_AUTHORIZATION_FAILED is OK
unauthorizedForDescribeTopics.map(topic =>
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, metadataCache.getTopicId(topic), isInternal = false, util.Collections.emptyList()))
} else {
// We should not return topicId when on unauthorized error, so we return zero uuid.
unauthorizedForDescribeTopics.map(topic =>
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal = false, util.Collections.emptyList()))
}
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
// In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
// From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
val errorUnavailableListeners = requestVersion >= 6
val allowAutoCreation = config.autoCreateTopicsEnable && metadataRequest.allowAutoTopicCreation && !metadataRequest.isAllTopics
val topicMetadata = getTopicMetadata(request, metadataRequest.isAllTopics, allowAutoCreation, authorizedTopics,
request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
var clusterAuthorizedOperations = Int.MinValue // Default value in the schema
if (requestVersion >= 8) {
// get cluster authorized operations
if (requestVersion <= 10) {
if (metadataRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER)
else
clusterAuthorizedOperations = 0
}
}
// get topic authorized operations
if (metadataRequest.data.includeTopicAuthorizedOperations) {
def setTopicAuthorizedOperations(topicMetadata: Seq[MetadataResponseTopic]): Unit = {
topicMetadata.foreach { topicData =>
topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name)))
}
}
setTopicAuthorizedOperations(topicMetadata)
}
}
val completeTopicMetadata = unknownTopicIdsTopicMetadata ++
topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.asScala.mkString(","), request.header.correlationId, request.header.clientId))
val controllerId = metadataCache.getRandomAliveBrokerId.orElse(MetadataResponse.NO_CONTROLLER_ID)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
MetadataResponse.prepareResponse(
requestVersion,
requestThrottleMs,
brokers,
clusterId,
controllerId,
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
}