private static void configureStartupPosition()

in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSourceProvider.java [54:73]


  private static <T> void configureStartupPosition(
      FlinkKafkaConsumer<T> consumer, KafkaIngressStartupPosition startupPosition) {
    if (startupPosition.isGroupOffsets()) {
      consumer.setStartFromGroupOffsets();
    } else if (startupPosition.isEarliest()) {
      consumer.setStartFromEarliest();
    } else if (startupPosition.isLatest()) {
      consumer.setStartFromLatest();
    } else if (startupPosition.isSpecificOffsets()) {
      KafkaIngressStartupPosition.SpecificOffsetsPosition offsetsPosition =
          startupPosition.asSpecificOffsets();
      consumer.setStartFromSpecificOffsets(
          convertKafkaTopicPartitionMap(offsetsPosition.specificOffsets()));
    } else if (startupPosition.isDate()) {
      KafkaIngressStartupPosition.DatePosition datePosition = startupPosition.asDate();
      consumer.setStartFromTimestamp(datePosition.epochMilli());
    } else {
      throw new IllegalStateException("Safe guard; should not occur");
    }
  }