in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java [58:82]
public void receive(Event[] events) {
StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
for (Event event : events) {
if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
reusableRecord.replace(toMap(event), event.getTimestamp());
output.collect(reusableRecord);
} else if (typeInfo.isTupleType()) {
Tuple tuple = this.toTuple(event);
reusableRecord.replace(tuple, event.getTimestamp());
output.collect(reusableRecord);
} else if (typeInfo instanceof PojoTypeInfo) {
R obj;
try {
obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
} catch (IllegalArgumentException ex) {
LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
throw ex;
}
reusableRecord.replace(obj, event.getTimestamp());
output.collect(reusableRecord);
} else {
throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
}
}
}