private static void setStartupPositionProperties()

in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java [96:120]


  private static void setStartupPositionProperties(
      Properties properties, KinesisIngressStartupPosition startupPosition) {
    if (startupPosition.isEarliest()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
    } else if (startupPosition.isLatest()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.LATEST.name());
    } else if (startupPosition.isDate()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP.name());

      final ZonedDateTime startupDate = startupPosition.asDate().date();
      final DateTimeFormatter formatter =
          DateTimeFormatter.ofPattern(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startupDate.format(formatter));
    } else {
      throw new IllegalStateException(
          "Unrecognized ingress startup position type: " + startupPosition);
    }
  }