def describeConfigs()

in core/src/main/scala/kafka/server/ConfigHelper.scala [84:177]


  def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
                      includeSynonyms: Boolean,
                      includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
    resourceToConfigNames.map { resource =>

      def createResponseConfig(configs: Map[String, Any],
                               createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {
        val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty)
          configs.toBuffer
        else
          configs.filter { case (configName, _) =>
            resource.configurationKeys.asScala.contains(configName)
          }.toBuffer

        val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
        new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
          .setConfigs(configEntries.asJava)
      }

      try {
        val configResult = ConfigResource.Type.forId(resource.resourceType) match {
          case ConfigResource.Type.TOPIC =>
            val topic = resource.resourceName
            Topic.validate(topic)
            if (metadataCache.contains(topic)) {
              val topicProps = configRepository.topicConfig(topic)
              val logConfig = LogConfig.fromProps(config.extractLogConfigMap, topicProps)
              createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
            } else {
              new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
                .setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
            }

          case ConfigResource.Type.BROKER =>
            if (resource.resourceName == null || resource.resourceName.isEmpty)
              createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
                createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
            else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
              createResponseConfig(allConfigs(config),
                createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
            else
              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")

          case ConfigResource.Type.BROKER_LOGGER =>
            if (resource.resourceName == null || resource.resourceName.isEmpty)
              throw new InvalidRequestException("Broker id must not be empty")
            else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
            else
              createResponseConfig(LoggingController.loggers,
                (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                  .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                  .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))

          case ConfigResource.Type.CLIENT_METRICS =>
            if (resource.resourceName == null || resource.resourceName.isEmpty) {
              throw new InvalidRequestException("Client metrics subscription name must not be empty")
            } else {
              val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
              val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps)
              createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation))
            }

          case ConfigResource.Type.GROUP =>
            val group = resource.resourceName
            if (group == null || group.isEmpty) {
              throw new InvalidRequestException("Group name must not be empty")
            } else {
              val groupProps = configRepository.groupConfig(group)
              val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps)
              createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation))
            }

          case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
        }
        configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
      } catch {
        case e: Throwable =>
          // Log client errors at a lower level than unexpected exceptions
          val message = s"Error processing describe configs request for resource $resource"
          if (e.isInstanceOf[ApiException])
            info(message, e)
          else
            error(message, e)
          val err = ApiError.fromThrowable(e)
          new DescribeConfigsResponseData.DescribeConfigsResult()
            .setResourceName(resource.resourceName)
            .setResourceType(resource.resourceType)
            .setErrorMessage(err.message)
            .setErrorCode(err.error.code)
            .setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
      }
    }
  }