public void endInput()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [366:416]


    public void endInput() throws Exception {
        if (status == HeadOperatorStatus.RUNNING) {
            recordProcessor.processElement(
                    new StreamRecord<>(IterationRecord.newEpochWatermark(0, "fake")));
        }

        // Since we choose to block here, we could not continue to process the barriers received
        // from the task inputs, which would block the precedent tasks from finishing since
        // they need to complete their final checkpoint. This is a temporary solution to this issue
        // that we will check the input channels, trigger all the checkpoints until we see
        // the EndOfPartitionEvent.
        checkState(getContainingTask().getEnvironment().getAllInputGates().length == 1);
        checkState(
                getContainingTask()
                                .getEnvironment()
                                .getAllInputGates()[0]
                                .getNumberOfInputChannels()
                        == 1);
        InputChannel inputChannel =
                getContainingTask().getEnvironment().getAllInputGates()[0].getChannel(0);

        boolean endOfPartitionReceived = false;
        long lastTriggerCheckpointId = 0;
        while (!endOfPartitionReceived && status != HeadOperatorStatus.TERMINATED) {
            mailboxExecutor.yield(200, TimeUnit.MILLISECONDS);

            List<AbstractEvent> events = parseInputChannelEvents(inputChannel);

            for (AbstractEvent event : events) {
                if (event instanceof CheckpointBarrier) {
                    CheckpointBarrier barrier = (CheckpointBarrier) event;
                    if (barrier.getId() > lastTriggerCheckpointId) {
                        getContainingTask()
                                .triggerCheckpointAsync(
                                        new CheckpointMetaData(
                                                barrier.getId(), barrier.getTimestamp()),
                                        barrier.getCheckpointOptions());
                        lastTriggerCheckpointId = barrier.getId();
                    }

                } else if (event instanceof EndOfPartitionEvent) {
                    endOfPartitionReceived = true;
                }
            }
        }

        // By here we could step into the normal loop.
        while (status != HeadOperatorStatus.TERMINATED) {
            mailboxExecutor.yield();
        }
    }