in core/src/main/scala/kafka/admin/ConfigCommand.scala [173:268]
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
val entityNameHead = entityNames.head
val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability
val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
case TopicType | ClientMetricsType | BrokerType | GroupType =>
val configResourceType = entityTypeHead match {
case TopicType => ConfigResource.Type.TOPIC
case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
case BrokerType => ConfigResource.Type.BROKER
case GroupType => ConfigResource.Type.GROUP
case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
}
try {
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
} catch {
case e: ExecutionException =>
e.getCause match {
case _: UnsupportedVersionException =>
throw new UnsupportedVersionException(s"The ${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The API is supported starting from version 2.3.0."
+ " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.")
case _ => throw e
}
case e: Throwable => throw e
}
case BrokerLoggerConfigType =>
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
// fail the command if any of the configured broker loggers do not exist
val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
if (invalidBrokerLoggers.nonEmpty)
throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case UserType | ClientType =>
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism.isScram(entry._1))
val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism.isScram)
val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
if (entityTypeHead == ClientType || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
// either just a client or both a user and a client
if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be added for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be deleted for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
} else { // ConfigType.User
if (unknownConfigsToAdd.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
if (unknownConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.")
if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '$UserType' using --bootstrap-server.")
}
}
if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
} else {
// handle altering user SCRAM credential configs
if (entityNames.size != 1)
// should never happen, if we get here then it is a bug
throw new IllegalStateException(s"Altering user SCRAM credentials should never occur for more zero or multiple users: $entityNames")
alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
}
case IpType =>
val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => QuotaConfig.ipConfigs.names.contains(key))
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '$IpType' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
case _ =>
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
}
if (entityNameHead.nonEmpty)
System.out.println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
System.out.println(s"Completed updating default config for $entityTypeHead in the cluster.")
}