def alterConfig()

in core/src/main/scala/kafka/admin/ConfigCommand.scala [173:268]


  def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
    val entityTypes = opts.entityTypes
    val entityNames = opts.entityNames
    val entityTypeHead = entityTypes.head
    val entityNameHead = entityNames.head
    val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability
    val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
    val configsToBeDeleted = parseConfigsToBeDeleted(opts)

    entityTypeHead match {
      case TopicType | ClientMetricsType | BrokerType | GroupType =>
        val configResourceType = entityTypeHead match {
          case TopicType => ConfigResource.Type.TOPIC
          case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
          case BrokerType => ConfigResource.Type.BROKER
          case GroupType => ConfigResource.Type.GROUP
          case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
        }
        try {
          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
        } catch {
          case e: ExecutionException =>
            e.getCause match {
              case _: UnsupportedVersionException =>
                throw new UnsupportedVersionException(s"The ${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The API is supported starting from version 2.3.0."
                + " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.")
              case _ => throw e
            }
          case e: Throwable => throw e
        }

      case BrokerLoggerConfigType =>
        val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
        // fail the command if any of the configured broker loggers do not exist
        val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
        if (invalidBrokerLoggers.nonEmpty)
          throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")

        val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
        val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
        val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
        val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
        val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
        adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

      case UserType | ClientType =>
        val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
        val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism.isScram(entry._1))
        val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
        val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig.isClientOrUserQuotaConfig)
        val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism.isScram)
        val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
        if (entityTypeHead == ClientType || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
          // either just a client or both a user and a client
          if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
            throw new IllegalArgumentException(s"Only quota configs can be added for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
          if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
            throw new IllegalArgumentException(s"Only quota configs can be deleted for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
        } else { // ConfigType.User
          if (unknownConfigsToAdd.nonEmpty)
            throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
          if (unknownConfigsToDelete.nonEmpty)
            throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
          if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
            if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
              throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.")
            if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
              throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '$UserType' using --bootstrap-server.")
          }
        }

        if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
          alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
        } else {
          // handle altering user SCRAM credential configs
          if (entityNames.size != 1)
            // should never happen, if we get here then it is a bug
            throw new IllegalStateException(s"Altering user SCRAM credentials should never occur for more zero or multiple users: $entityNames")
          alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
        }

      case IpType =>
        val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => QuotaConfig.ipConfigs.names.contains(key))
        if (unknownConfigs.nonEmpty)
          throw new IllegalArgumentException(s"Only connection quota configs can be added for '$IpType' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
        alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)

      case _ =>
        throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
    }

    if (entityNameHead.nonEmpty)
      System.out.println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
    else
      System.out.println(s"Completed updating default config for $entityTypeHead in the cluster.")
  }