in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [366:416]
public void endInput() throws Exception {
if (status == HeadOperatorStatus.RUNNING) {
recordProcessor.processElement(
new StreamRecord<>(IterationRecord.newEpochWatermark(0, "fake")));
}
// Since we choose to block here, we could not continue to process the barriers received
// from the task inputs, which would block the precedent tasks from finishing since
// they need to complete their final checkpoint. This is a temporary solution to this issue
// that we will check the input channels, trigger all the checkpoints until we see
// the EndOfPartitionEvent.
checkState(getContainingTask().getEnvironment().getAllInputGates().length == 1);
checkState(
getContainingTask()
.getEnvironment()
.getAllInputGates()[0]
.getNumberOfInputChannels()
== 1);
InputChannel inputChannel =
getContainingTask().getEnvironment().getAllInputGates()[0].getChannel(0);
boolean endOfPartitionReceived = false;
long lastTriggerCheckpointId = 0;
while (!endOfPartitionReceived && status != HeadOperatorStatus.TERMINATED) {
mailboxExecutor.yield(200, TimeUnit.MILLISECONDS);
List<AbstractEvent> events = parseInputChannelEvents(inputChannel);
for (AbstractEvent event : events) {
if (event instanceof CheckpointBarrier) {
CheckpointBarrier barrier = (CheckpointBarrier) event;
if (barrier.getId() > lastTriggerCheckpointId) {
getContainingTask()
.triggerCheckpointAsync(
new CheckpointMetaData(
barrier.getId(), barrier.getTimestamp()),
barrier.getCheckpointOptions());
lastTriggerCheckpointId = barrier.getId();
}
} else if (event instanceof EndOfPartitionEvent) {
endOfPartitionReceived = true;
}
}
}
// By here we could step into the normal loop.
while (status != HeadOperatorStatus.TERMINATED) {
mailboxExecutor.yield();
}
}