in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/events/kinesis/Event.java [35:54]
public static Event parseEvent(byte[] event) {
//parse the event payload and remove the type attribute
JsonReader jsonReader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(event)));
JsonElement jsonElement = Streams.parse(jsonReader);
JsonElement labelJsonElement = jsonElement.getAsJsonObject().remove(TYPE_FIELD);
if (labelJsonElement == null) {
throw new IllegalArgumentException("Event does not define a type field: " + new String(event));
}
//convert json to POJO, based on the type attribute
switch (labelJsonElement.getAsString()) {
case "watermark":
return gson.fromJson(jsonElement, WatermarkEvent.class);
case "trip":
return gson.fromJson(jsonElement, TripEvent.class);
default:
throw new IllegalArgumentException("Found unsupported event type: " + labelJsonElement.getAsString());
}
}