in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java [60:80]
public void receive(Event[] events) {
for (Event event : events) {
if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
} else if (typeInfo.isTupleType()) {
Tuple tuple = this.toTuple(event);
collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
} else if (typeInfo instanceof PojoTypeInfo) {
R obj;
try {
obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
} catch (IllegalArgumentException ex) {
LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
throw ex;
}
collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
} else {
throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
}
}
}