in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java [96:120]
private static void setStartupPositionProperties(
Properties properties, KinesisIngressStartupPosition startupPosition) {
if (startupPosition.isEarliest()) {
properties.setProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
} else if (startupPosition.isLatest()) {
properties.setProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.LATEST.name());
} else if (startupPosition.isDate()) {
properties.setProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP.name());
final ZonedDateTime startupDate = startupPosition.asDate().date();
final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
properties.setProperty(
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startupDate.format(formatter));
} else {
throw new IllegalStateException(
"Unrecognized ingress startup position type: " + startupPosition);
}
}