public void receive()

in flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java [58:82]


    public void receive(Event[] events) {
        StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L);
        for (Event event : events) {
            if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) {
                reusableRecord.replace(toMap(event), event.getTimestamp());
                output.collect(reusableRecord);
            } else if (typeInfo.isTupleType()) {
                Tuple tuple = this.toTuple(event);
                reusableRecord.replace(tuple, event.getTimestamp());
                output.collect(reusableRecord);
            } else if (typeInfo instanceof PojoTypeInfo) {
                R obj;
                try {
                    obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass());
                } catch (IllegalArgumentException ex) {
                    LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex);
                    throw ex;
                }
                reusableRecord.replace(obj, event.getTimestamp());
                output.collect(reusableRecord);
            } else {
                throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo);
            }
        }
    }