protected KafkaSource createKafkaSource()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java [429:502]


    protected KafkaSource<RowData> createKafkaSource(
            DeserializationSchema<RowData> keyDeserialization,
            DeserializationSchema<RowData> valueDeserialization,
            TypeInformation<RowData> producedTypeInfo) {

        final KafkaRecordDeserializationSchema<RowData> kafkaDeserializer =
                createKafkaDeserializationSchema(
                        keyDeserialization, valueDeserialization, producedTypeInfo);

        final KafkaSourceBuilder<RowData> kafkaSourceBuilder = KafkaSource.builder();

        if (topics != null) {
            kafkaSourceBuilder.setTopics(topics);
        } else {
            kafkaSourceBuilder.setTopicPattern(topicPattern);
        }

        switch (startupMode) {
            case EARLIEST:
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
                break;
            case LATEST:
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
                break;
            case GROUP_OFFSETS:
                String offsetResetConfig =
                        properties.getProperty(
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                                OffsetResetStrategy.NONE.name());
                OffsetResetStrategy offsetResetStrategy = getResetStrategy(offsetResetConfig);
                kafkaSourceBuilder.setStartingOffsets(
                        OffsetsInitializer.committedOffsets(offsetResetStrategy));
                break;
            case SPECIFIC_OFFSETS:
                Map<TopicPartition, Long> offsets = new HashMap<>();
                specificStartupOffsets.forEach(
                        (tp, offset) ->
                                offsets.put(
                                        new TopicPartition(tp.topic(), tp.partition()), offset));
                kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
                break;
            case TIMESTAMP:
                kafkaSourceBuilder.setStartingOffsets(
                        OffsetsInitializer.timestamp(startupTimestampMillis));
                break;
        }

        switch (boundedMode) {
            case UNBOUNDED:
                kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer());
                break;
            case LATEST:
                kafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
                break;
            case GROUP_OFFSETS:
                kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
                break;
            case SPECIFIC_OFFSETS:
                Map<TopicPartition, Long> offsets = new HashMap<>();
                specificBoundedOffsets.forEach(
                        (tp, offset) ->
                                offsets.put(
                                        new TopicPartition(tp.topic(), tp.partition()), offset));
                kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets));
                break;
            case TIMESTAMP:
                kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis));
                break;
        }

        kafkaSourceBuilder.setProperties(properties).setDeserializer(kafkaDeserializer);

        return kafkaSourceBuilder.build();
    }