in core/src/main/scala/kafka/server/KafkaConfig.scala [1460:1588]
def maxConnections = getInt(KafkaConfig.MaxConnectionsProp)
def maxConnectionCreationRate = getInt(KafkaConfig.MaxConnectionCreationRateProp)
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
val failedAuthenticationDelayMs = getInt(KafkaConfig.FailedAuthenticationDelayMsProp)
/***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp))
val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp))
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
val logDirs = CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
def logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
def numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
val logFlushStartOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
def logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
def logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp))
def logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
def logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
def logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp)
val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
val replicaFetchResponseMaxBytes = getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp)
val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
/** ********* Feature configuration ***********/
def isFeatureVersioningEnabled = interBrokerProtocolVersion >= KAFKA_2_7_IV0
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
val groupInitialRebalanceDelay = getInt(KafkaConfig.GroupInitialRebalanceDelayMsProp)
val groupMaxSize = getInt(KafkaConfig.GroupMaxSizeProp)
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp)
val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp)
val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp)
val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp)
val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp)
val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp)
val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull
/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
val transactionMaxTimeoutMs = getInt(KafkaConfig.TransactionsMaxTimeoutMsProp)
val transactionTopicMinISR = getInt(KafkaConfig.TransactionsTopicMinISRProp)
val transactionsLoadBufferSize = getInt(KafkaConfig.TransactionsLoadBufferSizeProp)
val transactionTopicReplicationFactor = getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
val transactionTopicPartitions = getInt(KafkaConfig.TransactionsTopicPartitionsProp)
val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
// Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
// retrieved using KafkaConfig#valuesWithPrefixOverride
private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = {
val value = valuesWithPrefixOverride(listenerName.configPrefix).get(KafkaConfig.SaslEnabledMechanismsProp)
if (value != null)
value.asInstanceOf[util.List[String]].asScala.toSet
else
Set.empty[String]
}