public static Event parseEvent()

in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/events/kinesis/Event.java [35:54]


  public static Event parseEvent(byte[] event) {
    //parse the event payload and remove the type attribute
    JsonReader jsonReader =  new JsonReader(new InputStreamReader(new ByteArrayInputStream(event)));
    JsonElement jsonElement = Streams.parse(jsonReader);
    JsonElement labelJsonElement = jsonElement.getAsJsonObject().remove(TYPE_FIELD);

    if (labelJsonElement == null) {
      throw new IllegalArgumentException("Event does not define a type field: " + new String(event));
    }

    //convert json to POJO, based on the type attribute
    switch (labelJsonElement.getAsString()) {
      case "watermark":
        return gson.fromJson(jsonElement, WatermarkEvent.class);
      case "trip":
        return gson.fromJson(jsonElement, TripEvent.class);
      default:
        throw new IllegalArgumentException("Found unsupported event type: " + labelJsonElement.getAsString());
    }
  }