in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [437:479]
private List<AbstractEvent> parseInputChannelEvents(InputChannel inputChannel)
throws Exception {
List<AbstractEvent> events = new ArrayList<>();
if (inputChannel instanceof RemoteInputChannel) {
Class<?> seqBufferClass =
Class.forName(
"org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$SequenceBuffer");
PrioritizedDeque<?> queue =
ReflectionUtils.getFieldValue(
inputChannel, RemoteInputChannel.class, "receivedBuffers");
for (Object sequenceBuffer : queue) {
Buffer buffer =
ReflectionUtils.getFieldValue(sequenceBuffer, seqBufferClass, "buffer");
if (!buffer.isBuffer()) {
events.add(EventSerializer.fromBuffer(buffer, getClass().getClassLoader()));
}
}
} else if (inputChannel instanceof LocalInputChannel) {
PipelinedSubpartitionView subpartitionView =
ReflectionUtils.getFieldValue(
inputChannel, LocalInputChannel.class, "subpartitionView");
PipelinedSubpartition pipelinedSubpartition =
ReflectionUtils.getFieldValue(
subpartitionView, PipelinedSubpartitionView.class, "parent");
PrioritizedDeque<BufferConsumerWithPartialRecordLength> queue =
ReflectionUtils.getFieldValue(
pipelinedSubpartition, PipelinedSubpartition.class, "buffers");
synchronized (queue) {
for (BufferConsumerWithPartialRecordLength bufferConsumer : queue) {
if (!bufferConsumer.getBufferConsumer().isBuffer()) {
events.add(
EventSerializer.fromBuffer(
bufferConsumer.getBufferConsumer().copy().build(),
getClass().getClassLoader()));
}
}
}
} else {
LOG.warn("Unknown input channel type: " + inputChannel);
}
return events;
}