in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java [312:541]
private static ConfigDef config(Crypto crypto) {
return baseConfigDef()
.define(GROUP_ID_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
Math.toIntExact(TimeUnit.SECONDS.toMillis(10)),
ConfigDef.Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(REBALANCE_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
Math.toIntExact(TimeUnit.MINUTES.toMillis(1)),
ConfigDef.Importance.HIGH,
REBALANCE_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
ConfigDef.Type.INT,
Math.toIntExact(TimeUnit.SECONDS.toMillis(3)),
ConfigDef.Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(5),
atLeast(0),
ConfigDef.Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
.define(CommonClientConfigs.CLIENT_ID_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
CommonClientConfigs.CLIENT_ID_DOC)
.define(CommonClientConfigs.SEND_BUFFER_CONFIG,
ConfigDef.Type.INT,
128 * 1024,
atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SEND_BUFFER_DOC)
.define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
ConfigDef.Type.INT,
32 * 1024,
atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
50L,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.SECONDS.toMillis(1),
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
.define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC)
.define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
Math.toIntExact(TimeUnit.SECONDS.toMillis(40)),
atLeast(0),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(9),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSaslSupport()
.define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
3000,
ConfigDef.Importance.MEDIUM,
WORKER_SYNC_TIMEOUT_MS_DOC)
.define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
ConfigDef.Type.INT,
WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
ConfigDef.Importance.MEDIUM,
WORKER_UNSYNC_BACKOFF_MS_DOC)
.define(OFFSET_STORAGE_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
OFFSET_STORAGE_TOPIC_CONFIG_DOC)
.define(OFFSET_STORAGE_PARTITIONS_CONFIG,
ConfigDef.Type.INT,
25,
PARTITIONS_VALIDATOR,
ConfigDef.Importance.LOW,
OFFSET_STORAGE_PARTITIONS_CONFIG_DOC)
.define(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
REPLICATION_FACTOR_VALIDATOR,
ConfigDef.Importance.LOW,
OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
.define(CONFIG_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
CONFIG_TOPIC_CONFIG_DOC)
.define(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
REPLICATION_FACTOR_VALIDATOR,
ConfigDef.Importance.LOW,
CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
.define(STATUS_STORAGE_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
STATUS_STORAGE_TOPIC_CONFIG_DOC)
.define(STATUS_STORAGE_PARTITIONS_CONFIG,
ConfigDef.Type.INT,
5,
PARTITIONS_VALIDATOR,
ConfigDef.Importance.LOW,
STATUS_STORAGE_PARTITIONS_CONFIG_DOC)
.define(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
REPLICATION_FACTOR_VALIDATOR,
ConfigDef.Importance.LOW,
STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
.define(CONNECT_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CONNECT_PROTOCOL_DEFAULT,
ConfigDef.LambdaValidator.with(
(name, value) -> {
try {
ConnectProtocolCompatibility.compatibility((String) value);
} catch (Throwable t) {
throw new ConfigException(name, value, "Invalid Connect protocol "
+ "compatibility");
}
},
() -> Arrays.stream(ConnectProtocolCompatibility.values()).map(ConnectProtocolCompatibility::toString)
.collect(Collectors.joining(", ", "[", "]"))),
ConfigDef.Importance.LOW,
CONNECT_PROTOCOL_DOC)
.define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG,
ConfigDef.Type.INT,
SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT,
between(0, Integer.MAX_VALUE),
ConfigDef.Importance.LOW,
SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC)
.define(INTER_WORKER_KEY_TTL_MS_CONFIG,
ConfigDef.Type.INT,
INTER_WORKER_KEY_TTL_MS_MS_DEFAULT,
between(0, Integer.MAX_VALUE),
ConfigDef.Importance.LOW,
INTER_WORKER_KEY_TTL_MS_MS_DOC)
.define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG,
ConfigDef.Type.STRING,
defaultKeyGenerationAlgorithm(crypto),
ConfigDef.LambdaValidator.with(
(name, value) -> validateKeyAlgorithm(crypto, name, (String) value),
() -> "Any KeyGenerator algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC)
.define(INTER_WORKER_KEY_SIZE_CONFIG,
ConfigDef.Type.INT,
INTER_WORKER_KEY_SIZE_DEFAULT,
ConfigDef.Importance.LOW,
INTER_WORKER_KEY_SIZE_DOC)
.define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG,
ConfigDef.Type.STRING,
defaultSignatureAlgorithm(crypto),
ConfigDef.LambdaValidator.with(
(name, value) -> validateSignatureAlgorithm(crypto, name, (String) value),
() -> "Any MAC algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_SIGNATURE_ALGORITHM_DOC)
.define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG,
ConfigDef.Type.LIST,
defaultVerificationAlgorithms(crypto),
ConfigDef.LambdaValidator.with(
(name, value) -> validateVerificationAlgorithms(crypto, name, (List<String>) value),
() -> "A list of one or more MAC algorithms, each supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC)
.define(METADATA_RECOVERY_STRATEGY_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
ConfigDef.Importance.LOW,
METADATA_RECOVERY_STRATEGY_DOC)
.define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0),
ConfigDef.Importance.LOW,
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
}