in core/src/main/scala/kafka/server/KafkaConfig.scala [583:732]
private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.")
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listener)).toSet
// validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
|set of controllers.""".stripMargin.replace("\n", " ")
)
}
}
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
require(voterIds.isEmpty || voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
}
def validateAdvertisedControllerListenersNonEmptyForKRaftController(): Unit = {
require(effectiveAdvertisedControllerListeners.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
}
def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
val listenerNameValues = listeners.map(_.listener).toSet
require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
}
def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
require(advertisedBrokerListenerNames.nonEmpty,
"There must be at least one broker advertised listener." + (
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
}
def warnIfConfigDefinedInWrongRole(expectedRole: ProcessRole, configName: String): Unit = {
if (originals.containsKey(configName)) {
warn(s"$configName is defined in ${processRoles.mkString(", ")}. It should be defined in the $expectedRole role.")
}
}
if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
// nodeId must not appear in controller.quorum.voters
require(!voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
// controller.listener.names must be non-empty...
require(controllerListenerNames.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value when running KRaft with just the broker role")
// controller.listener.names are forbidden in listeners...
require(controllerListeners.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain a value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running KRaft with just the broker role")
// controller.listener.names must all appear in listener.security.protocol.map
controllerListenerNames.foreach { name =>
val listenerName = ListenerName.normalised(name)
if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " +
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} (an explicit security mapping for each controller listener is required if ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or if there are security protocols other than PLAINTEXT in use)")
}
}
// warn that only the first controller listener is used if there is more than one
if (controllerListenerNames.size > 1) {
warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
}
// warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
// listeners should only contain listeners also enumerated in the controller listener
require(
effectiveAdvertisedControllerListeners.size == listeners.size,
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
)
// controller.listener.names must not contain inter.broker.listener.name when inter.broker.listener.name is explicitly set
if (Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)).isDefined) {
require(
!controllerListenerNames.contains(interBrokerListenerName.value()),
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain an explicitly set ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'"
)
}
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else if (isKRaftCombinedMode) {
// KRaft combined broker and controller
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
}
val listenerNames = listeners.map(l => ListenerName.normalised(l.listener)).toSet
if (processRoles.contains(ProcessRole.BrokerRole)) {
validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
require(advertisedBrokerListenerNames.subsetOf(listenerNames),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " +
s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
}
require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol),
s"${BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG} must be included in ${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG} when SASL is used for inter-broker communication")
require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
s"${SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG} must be larger or equal to ${SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG}")
if (maxConnectionsPerIp == 0)
require(maxConnectionsPerIpOverrides.nonEmpty, s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only if" +
s" ${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG} property is set.")
val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
if (invalidAddresses.nonEmpty)
throw new IllegalArgumentException(s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG} contains invalid addresses : ${invalidAddresses.mkString(",")}")
if (connectionsMaxIdleMs >= 0)
require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
s"${SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG}=$failedAuthenticationDelayMs should always be less than" +
s" ${SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
val principalBuilderClass = getClass(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)
require(principalBuilderClass != null, s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
}