in core/src/main/scala/kafka/server/ConfigHelper.scala [84:177]
def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
resourceToConfigNames.map { resource =>
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {
val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty)
configs.toBuffer
else
configs.filter { case (configName, _) =>
resource.configurationKeys.asScala.contains(configName)
}.toBuffer
val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
.setConfigs(configEntries.asJava)
}
try {
val configResult = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.TOPIC =>
val topic = resource.resourceName
Topic.validate(topic)
if (metadataCache.contains(topic)) {
val topicProps = configRepository.topicConfig(topic)
val logConfig = LogConfig.fromProps(config.extractLogConfigMap, topicProps)
createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
} else {
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
case ConfigResource.Type.BROKER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")
case ConfigResource.Type.BROKER_LOGGER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
throw new InvalidRequestException("Broker id must not be empty")
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
else
createResponseConfig(LoggingController.loggers,
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
case ConfigResource.Type.CLIENT_METRICS =>
if (resource.resourceName == null || resource.resourceName.isEmpty) {
throw new InvalidRequestException("Client metrics subscription name must not be empty")
} else {
val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps)
createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation))
}
case ConfigResource.Type.GROUP =>
val group = resource.resourceName
if (group == null || group.isEmpty) {
throw new InvalidRequestException("Group name must not be empty")
} else {
val groupProps = configRepository.groupConfig(group)
val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps)
createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation))
}
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
} catch {
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing describe configs request for resource $resource"
if (e.isInstanceOf[ApiException])
info(message, e)
else
error(message, e)
val err = ApiError.fromThrowable(e)
new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(resource.resourceName)
.setResourceType(resource.resourceType)
.setErrorMessage(err.message)
.setErrorCode(err.error.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
}
}
}