public void receive()

in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java [60:80]


    public void receive(Event[] events) {
        for (Event event : events) {
            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
                collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp()));
            } else if (typeInfo.isTupleType()) {
                Tuple tuple = this.toTuple(event);
                collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp()));
            } else if (typeInfo instanceof PojoTypeInfo) {
                R obj;
                try {
                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
                } catch (IllegalArgumentException ex) {
                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
                    throw ex;
                }
                collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp()));
            } else {
                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
            }
        }
    }