flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/Iterations.java [476:499]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static DataStreamList addTails(
            DataStreamList dataStreams, IterationID iterationId, int startIndex) {
        return new DataStreamList(
                map(
                        dataStreams,
                        (index, dataStream) -> {
                            Transformation<?> inputTransformation = dataStream.getTransformation();
                            if (!(inputTransformation instanceof PhysicalTransformation)
                                    && inputTransformation.getInputs().size() > 1) {
                                // TODO: Support epoch watermark alignment for TailOperator.
                                throw new UnsupportedOperationException(
                                        "Tail operator should have only one input. Please check whether operator \""
                                                + inputTransformation.getName()
                                                + "\" contains multiple inputs.");
                            }

                            return ((DataStream<IterationRecord<?>>) dataStream)
                                    .transform(
                                            "tail-" + dataStream.getTransformation().getName(),
                                            new IterationRecordTypeInfo(dataStream.getType()),
                                            new TailOperator(iterationId, startIndex + index))
                                    .setParallelism(dataStream.getParallelism());
                        }));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/Iterations.java [476:499]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static DataStreamList addTails(
            DataStreamList dataStreams, IterationID iterationId, int startIndex) {
        return new DataStreamList(
                map(
                        dataStreams,
                        (index, dataStream) -> {
                            Transformation<?> inputTransformation = dataStream.getTransformation();
                            if (!(inputTransformation instanceof PhysicalTransformation)
                                    && inputTransformation.getInputs().size() > 1) {
                                // TODO: Support epoch watermark alignment for TailOperator.
                                throw new UnsupportedOperationException(
                                        "Tail operator should have only one input. Please check whether operator \""
                                                + inputTransformation.getName()
                                                + "\" contains multiple inputs.");
                            }

                            return ((DataStream<IterationRecord<?>>) dataStream)
                                    .transform(
                                            "tail-" + dataStream.getTransformation().getName(),
                                            new IterationRecordTypeInfo(dataStream.getType()),
                                            new TailOperator(iterationId, startIndex + index))
                                    .setParallelism(dataStream.getParallelism());
                        }));
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



