in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec.java [224:246]
public KafkaIngressStartupPosition deserialize(
JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
final ObjectNode startupPositionNode = jsonParser.readValueAs(ObjectNode.class);
final String startupTypeString = startupPositionNode.get("type").asText();
switch (startupTypeString) {
case "group-offsets":
return KafkaIngressStartupPosition.fromGroupOffsets();
case "earliest":
return KafkaIngressStartupPosition.fromEarliest();
case "latest":
return KafkaIngressStartupPosition.fromLatest();
case "specific-offsets":
return KafkaIngressStartupPosition.fromSpecificOffsets(
parseSpecificStartupOffsetsMap(startupPositionNode));
case "date":
return KafkaIngressStartupPosition.fromDate(parseStartupDate(startupPositionNode));
default:
throw new IllegalArgumentException(
"Invalid startup position type: "
+ startupTypeString
+ "; valid values are [group-offsets, earliest, latest, specific-offsets, date]");
}
}