private List parseInputChannelEvents()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [437:479]


    private List<AbstractEvent> parseInputChannelEvents(InputChannel inputChannel)
            throws Exception {
        List<AbstractEvent> events = new ArrayList<>();
        if (inputChannel instanceof RemoteInputChannel) {
            Class<?> seqBufferClass =
                    Class.forName(
                            "org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$SequenceBuffer");
            PrioritizedDeque<?> queue =
                    ReflectionUtils.getFieldValue(
                            inputChannel, RemoteInputChannel.class, "receivedBuffers");
            for (Object sequenceBuffer : queue) {
                Buffer buffer =
                        ReflectionUtils.getFieldValue(sequenceBuffer, seqBufferClass, "buffer");
                if (!buffer.isBuffer()) {
                    events.add(EventSerializer.fromBuffer(buffer, getClass().getClassLoader()));
                }
            }
        } else if (inputChannel instanceof LocalInputChannel) {
            PipelinedSubpartitionView subpartitionView =
                    ReflectionUtils.getFieldValue(
                            inputChannel, LocalInputChannel.class, "subpartitionView");
            PipelinedSubpartition pipelinedSubpartition =
                    ReflectionUtils.getFieldValue(
                            subpartitionView, PipelinedSubpartitionView.class, "parent");
            PrioritizedDeque<BufferConsumerWithPartialRecordLength> queue =
                    ReflectionUtils.getFieldValue(
                            pipelinedSubpartition, PipelinedSubpartition.class, "buffers");
            synchronized (queue) {
                for (BufferConsumerWithPartialRecordLength bufferConsumer : queue) {
                    if (!bufferConsumer.getBufferConsumer().isBuffer()) {
                        events.add(
                                EventSerializer.fromBuffer(
                                        bufferConsumer.getBufferConsumer().copy().build(),
                                        getClass().getClassLoader()));
                    }
                }
            }
        } else {
            LOG.warn("Unknown input channel type: " + inputChannel);
        }

        return events;
    }