in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [439:474]
private void parseAndSetRequiredProperties() {
maybeOverride(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
true);
maybeOverride(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
true);
if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
LOG.warn(
"Offset commit on checkpoint is disabled because {} is not specified",
ConsumerConfig.GROUP_ID_CONFIG);
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
}
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
true);
// If the source is bounded, do not run periodic partition discovery.
maybeOverride(
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"-1",
boundedness == Boundedness.BOUNDED);
// If the client id prefix is not set, reuse the consumer group id as the client id prefix,
// or generate a random string if consumer group id is not specified.
maybeOverride(
KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
: "KafkaSource-" + new Random().nextLong(),
false);
}