private void parseAndSetRequiredProperties()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [455:489]


    private void parseAndSetRequiredProperties() {
        maybeOverride(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName(),
                false);
        maybeOverride(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName(),
                false);
        if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
            LOG.warn(
                    "Offset commit on checkpoint is disabled because {} is not specified",
                    ConsumerConfig.GROUP_ID_CONFIG);
            maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
        }
        maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
        maybeOverride(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
                true);

        // If the source is bounded, do not run periodic partition discovery.
        if (boundedness == Boundedness.BOUNDED) {
            maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", true);
        }

        // If the client id prefix is not set, reuse the consumer group id as the client id prefix,
        // or generate a random string if consumer group id is not specified.
        maybeOverride(
                KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
                        ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
                        : "KafkaSource-" + new Random().nextLong(),
                false);
    }