in streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java [888:1276]
static {
CONFIG = new ConfigDef()
// HIGH
.define(APPLICATION_ID_CONFIG, // required with no default value
Type.STRING,
Importance.HIGH,
APPLICATION_ID_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.HIGH,
NUM_STANDBY_REPLICAS_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams",
Importance.HIGH,
STATE_DIR_DOC,
"${java.io.tmpdir}")
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)
// MEDIUM
.define(ACCEPTABLE_RECOVERY_LAG_CONFIG,
Type.LONG,
10_000L,
atLeast(0),
Importance.MEDIUM,
ACCEPTABLE_RECOVERY_LAG_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
10 * 1024 * 1024L,
atLeast(0),
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
Type.LONG,
10 * 1024 * 1024L,
atLeast(0),
Importance.MEDIUM,
STATESTORE_CACHE_MAX_BYTES_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
CLIENT_ID_DOC,
"<code><application.id>-<random-UUID></code>")
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_KEY_SERDE_CLASS_DOC)
.define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Type.CLASS,
null,
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC)
.define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Type.CLASS,
null,
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC)
.define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
Type.CLASS,
null,
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC)
.define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
Type.CLASS,
null,
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC)
.define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
FailOnInvalidTimestamp.class.getName(),
Importance.MEDIUM,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_VALUE_SERDE_CLASS_DOC)
.define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
0L,
Importance.MEDIUM,
MAX_TASK_IDLE_MS_DOC)
.define(MAX_WARMUP_REPLICAS_CONFIG,
Type.INT,
2,
atLeast(1),
Importance.MEDIUM,
MAX_WARMUP_REPLICAS_DOC)
.define(NUM_STREAM_THREADS_CONFIG,
Type.INT,
1,
Importance.MEDIUM,
NUM_STREAM_THREADS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING,
AT_LEAST_ONCE,
in(AT_LEAST_ONCE, EXACTLY_ONCE_V2),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
.define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(TASK_ASSIGNOR_CLASS_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
TASK_ASSIGNOR_CLASS_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
Importance.MEDIUM,
REPLICATION_FACTOR_DOC)
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
Duration.ofMinutes(5L).toMillis(),
atLeast(0L),
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC)
.define(TOPOLOGY_OPTIMIZATION_CONFIG,
Type.STRING,
NO_OPTIMIZATION,
ConfigDef.LambdaValidator.with(
(name, value) -> verifyTopologyOptimizationConfigs((String) value),
TOPOLOGY_OPTIMIZATION_CONFIGS::toString),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
.define(GROUP_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_GROUP_PROTOCOL,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
Importance.MEDIUM,
GROUP_PROTOCOL_DOC)
// LOW
.define(APPLICATION_SERVER_CONFIG,
Type.STRING,
"",
Importance.LOW,
APPLICATION_SERVER_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(BUILT_IN_METRICS_VERSION_CONFIG,
Type.STRING,
METRICS_LATEST,
in(
METRICS_LATEST
),
Importance.LOW,
BUILT_IN_METRICS_VERSION_DOC)
.define(COMMIT_INTERVAL_MS_CONFIG,
Type.LONG,
DEFAULT_COMMIT_INTERVAL_MS,
atLeast(0),
Importance.LOW,
COMMIT_INTERVAL_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
Type.INT,
null,
Importance.LOW,
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
Type.STRING,
RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY),
Importance.LOW,
RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
.define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
Type.LIST,
Collections.emptyList(),
atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
Importance.LOW,
RACK_AWARE_ASSIGNMENT_TAGS_DOC)
.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
Type.INT,
null,
Importance.LOW,
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
.define(REPARTITION_PURGE_INTERVAL_MS_CONFIG,
Type.LONG,
DEFAULT_COMMIT_INTERVAL_MS,
atLeast(0),
Importance.LOW,
REPARTITION_PURGE_INTERVAL_MS_DOC)
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000L,
Importance.LOW,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(DEFAULT_DSL_STORE_CONFIG,
Type.STRING,
DEFAULT_DSL_STORE,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC)
.define(DSL_STORE_SUPPLIERS_CLASS_CONFIG,
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC)
.define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
Type.CLASS,
DefaultKafkaClientSupplier.class.getName(),
Importance.LOW,
DEFAULT_CLIENT_SUPPLIER_DOC)
.define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
Type.LONG,
2 * 60 * 1000L,
Importance.LOW,
LOG_SUMMARY_INTERVAL_MS_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Type.LONG,
5 * 60 * 1000L,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG,
Type.INT,
2,
atLeast(1),
Importance.LOW,
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
JmxReporter.class.getName(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), RecordingLevel.TRACE.toString()),
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000L,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100L,
Importance.LOW,
POLL_MS_DOC)
.define(PROBING_REBALANCE_INTERVAL_MS_CONFIG,
Type.LONG,
10 * 60 * 1000L,
atLeast(60 * 1000L),
Importance.LOW,
PROBING_REBALANCE_INTERVAL_MS_DOC)
.define(PROCESSOR_WRAPPER_CLASS_CONFIG,
Type.CLASS,
NoOpProcessorWrapper.class,
Importance.LOW,
PROCESSOR_WRAPPER_CLASS_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
32 * 1024,
atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
Importance.LOW,
CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG,
Type.LONG,
50L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
Type.LONG,
1000L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
Type.LONG,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
null,
Importance.LOW,
ROCKSDB_CONFIG_SETTER_CLASS_DOC)
.define(SEND_BUFFER_CONFIG,
Type.INT,
128 * 1024,
atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
Importance.LOW,
CommonClientConfigs.SEND_BUFFER_DOC)
.define(STATE_CLEANUP_DELAY_MS_CONFIG,
Type.LONG,
10 * 60 * 1000L,
Importance.LOW,
STATE_CLEANUP_DELAY_MS_DOC)
.define(UPGRADE_FROM_CONFIG,
Type.STRING,
null,
in(Stream.concat(
Stream.of((String) null),
Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString)
).toArray(String[]::new)),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOWED_INNER_CLASS_SERDE,
Type.STRING,
null,
Importance.LOW,
WINDOWED_INNER_CLASS_SERDE_DOC)
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Type.LONG,
24 * 60 * 60 * 1000L,
Importance.LOW,
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
.define(WINDOW_SIZE_MS_CONFIG,
Type.LONG,
null,
Importance.LOW,
WINDOW_SIZE_MS_DOC);
}