in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSourceProvider.java [54:73]
private static <T> void configureStartupPosition(
FlinkKafkaConsumer<T> consumer, KafkaIngressStartupPosition startupPosition) {
if (startupPosition.isGroupOffsets()) {
consumer.setStartFromGroupOffsets();
} else if (startupPosition.isEarliest()) {
consumer.setStartFromEarliest();
} else if (startupPosition.isLatest()) {
consumer.setStartFromLatest();
} else if (startupPosition.isSpecificOffsets()) {
KafkaIngressStartupPosition.SpecificOffsetsPosition offsetsPosition =
startupPosition.asSpecificOffsets();
consumer.setStartFromSpecificOffsets(
convertKafkaTopicPartitionMap(offsetsPosition.specificOffsets()));
} else if (startupPosition.isDate()) {
KafkaIngressStartupPosition.DatePosition datePosition = startupPosition.asDate();
consumer.setStartFromTimestamp(datePosition.epochMilli());
} else {
throw new IllegalStateException("Safe guard; should not occur");
}
}