public KafkaIngressStartupPosition deserialize()

in statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressSpec.java [224:246]


    public KafkaIngressStartupPosition deserialize(
        JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
      final ObjectNode startupPositionNode = jsonParser.readValueAs(ObjectNode.class);
      final String startupTypeString = startupPositionNode.get("type").asText();
      switch (startupTypeString) {
        case "group-offsets":
          return KafkaIngressStartupPosition.fromGroupOffsets();
        case "earliest":
          return KafkaIngressStartupPosition.fromEarliest();
        case "latest":
          return KafkaIngressStartupPosition.fromLatest();
        case "specific-offsets":
          return KafkaIngressStartupPosition.fromSpecificOffsets(
              parseSpecificStartupOffsetsMap(startupPositionNode));
        case "date":
          return KafkaIngressStartupPosition.fromDate(parseStartupDate(startupPositionNode));
        default:
          throw new IllegalArgumentException(
              "Invalid startup position type: "
                  + startupTypeString
                  + "; valid values are [group-offsets, earliest, latest, specific-offsets, date]");
      }
    }