in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java [429:502]
protected KafkaSource<RowData> createKafkaSource(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final KafkaRecordDeserializationSchema<RowData> kafkaDeserializer =
createKafkaDeserializationSchema(
keyDeserialization, valueDeserialization, producedTypeInfo);
final KafkaSourceBuilder<RowData> kafkaSourceBuilder = KafkaSource.builder();
if (topics != null) {
kafkaSourceBuilder.setTopics(topics);
} else {
kafkaSourceBuilder.setTopicPattern(topicPattern);
}
switch (startupMode) {
case EARLIEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break;
case LATEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
String offsetResetConfig =
properties.getProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.NONE.name());
OffsetResetStrategy offsetResetStrategy = getResetStrategy(offsetResetConfig);
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(offsetResetStrategy));
break;
case SPECIFIC_OFFSETS:
Map<TopicPartition, Long> offsets = new HashMap<>();
specificStartupOffsets.forEach(
(tp, offset) ->
offsets.put(
new TopicPartition(tp.topic(), tp.partition()), offset));
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
break;
case TIMESTAMP:
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.timestamp(startupTimestampMillis));
break;
}
switch (boundedMode) {
case UNBOUNDED:
kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer());
break;
case LATEST:
kafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
break;
case SPECIFIC_OFFSETS:
Map<TopicPartition, Long> offsets = new HashMap<>();
specificBoundedOffsets.forEach(
(tp, offset) ->
offsets.put(
new TopicPartition(tp.topic(), tp.partition()), offset));
kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets));
break;
case TIMESTAMP:
kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis));
break;
}
kafkaSourceBuilder.setProperties(properties).setDeserializer(kafkaDeserializer);
return kafkaSourceBuilder.build();
}