private def validateValues()

in core/src/main/scala/kafka/server/KafkaConfig.scala [583:732]


  private def validateValues(): Unit = {
    if (nodeId != brokerId) {
      throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.")
    }
    require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
    require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
    require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
    require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
    require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
    require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
      " to prevent unnecessary socket timeouts")
    require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
      " to prevent frequent changes in ISR")

    val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listener)).toSet

    // validate KRaft-related configs
    val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
    def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
      if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
        throw new ConfigException(
          s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
          |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
          |set of controllers.""".stripMargin.replace("\n", " ")
        )
      }
    }

    def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
      require(voterIds.isEmpty || voterIds.contains(nodeId),
        s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
    }
    def validateAdvertisedControllerListenersNonEmptyForKRaftController(): Unit = {
      require(effectiveAdvertisedControllerListeners.nonEmpty,
        s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
    }
    def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
      val listenerNameValues = listeners.map(_.listener).toSet
      require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
        s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
    }
    def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
      require(advertisedBrokerListenerNames.nonEmpty,
        "There must be at least one broker advertised listener." + (
          if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
    }
    def warnIfConfigDefinedInWrongRole(expectedRole: ProcessRole, configName: String): Unit = {
      if (originals.containsKey(configName)) {
        warn(s"$configName is defined in ${processRoles.mkString(", ")}. It should be defined in the $expectedRole role.")
      }
    }
    if (processRoles == Set(ProcessRole.BrokerRole)) {
      // KRaft broker-only
      validateQuorumVotersAndQuorumBootstrapServerForKRaft()
      // nodeId must not appear in controller.quorum.voters
      require(!voterIds.contains(nodeId),
        s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
      // controller.listener.names must be non-empty...
      require(controllerListenerNames.nonEmpty,
        s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value when running KRaft with just the broker role")
      // controller.listener.names are forbidden in listeners...
      require(controllerListeners.isEmpty,
        s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain a value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running KRaft with just the broker role")
      // controller.listener.names must all appear in listener.security.protocol.map
      controllerListenerNames.foreach { name =>
        val listenerName = ListenerName.normalised(name)
        if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
          throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " +
            s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}  (an explicit security mapping for each controller listener is required if ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or if there are security protocols other than PLAINTEXT in use)")
        }
      }
      // warn that only the first controller listener is used if there is more than one
      if (controllerListenerNames.size > 1) {
        warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
      }
      // warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role
      warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
      warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
    } else if (processRoles == Set(ProcessRole.ControllerRole)) {
      // KRaft controller-only
      validateQuorumVotersAndQuorumBootstrapServerForKRaft()
      // listeners should only contain listeners also enumerated in the controller listener
      require(
        effectiveAdvertisedControllerListeners.size == listeners.size,
        s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
      )
      // controller.listener.names must not contain inter.broker.listener.name when inter.broker.listener.name is explicitly set
      if (Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)).isDefined) {
        require(
          !controllerListenerNames.contains(interBrokerListenerName.value()),
          s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain an explicitly set ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'"
        )
      }
      validateControllerQuorumVotersMustContainNodeIdForKRaftController()
      validateAdvertisedControllerListenersNonEmptyForKRaftController()
      validateControllerListenerNamesMustAppearInListenersForKRaftController()
    } else if (isKRaftCombinedMode) {
      // KRaft combined broker and controller
      validateQuorumVotersAndQuorumBootstrapServerForKRaft()
      validateControllerQuorumVotersMustContainNodeIdForKRaftController()
      validateAdvertisedControllerListenersNonEmptyForKRaftController()
      validateControllerListenerNamesMustAppearInListenersForKRaftController()
    }

    val listenerNames = listeners.map(l => ListenerName.normalised(l.listener)).toSet
    if (processRoles.contains(ProcessRole.BrokerRole)) {
      validateAdvertisedBrokerListenersNonEmptyForBroker()
      require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
        s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
          s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
      require(advertisedBrokerListenerNames.subsetOf(listenerNames),
        s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " +
          s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
          s"are ${listenerNames.map(_.value).mkString(",")}"
      )
    }

    require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
      s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
      s"Use a routable IP address.")

    require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
      s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
      s"Use a routable IP address.")

    val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
    require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol),
      s"${BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG} must be included in ${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG} when SASL is used for inter-broker communication")
    require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
      s"${SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG} must be larger or equal to ${SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG}")

    if (maxConnectionsPerIp == 0)
      require(maxConnectionsPerIpOverrides.nonEmpty, s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only if" +
        s" ${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG} property is set.")

    val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
    if (invalidAddresses.nonEmpty)
      throw new IllegalArgumentException(s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG} contains invalid addresses : ${invalidAddresses.mkString(",")}")

    if (connectionsMaxIdleMs >= 0)
      require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
        s"${SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG}=$failedAuthenticationDelayMs should always be less than" +
        s" ${SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG}=$connectionsMaxIdleMs to prevent failed" +
        s" authentication responses from timing out")

    val principalBuilderClass = getClass(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)
    require(principalBuilderClass != null, s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must be non-null")
    require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
      s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
  }