public void endInput()

in flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java [167:234]


    public void endInput(int inputId) throws Exception {
        if (inputId == 1) {
            sorterA.finishReading();
            remainingInputNum--;
        } else if (inputId == 2) {
            sorterB.finishReading();
            remainingInputNum--;
        } else {
            throw new RuntimeException("Unknown inputId " + inputId);
        }

        if (remainingInputNum > 0) {
            return;
        }

        MutableObjectIterator<Tuple2<byte[], StreamRecord<IN1>>> iteratorA = sorterA.getIterator();
        MutableObjectIterator<Tuple2<byte[], StreamRecord<IN2>>> iteratorB = sorterB.getIterator();
        TypePairComparator<Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>
                pairComparator =
                        (new RuntimePairComparatorFactory<
                                        Tuple2<byte[], StreamRecord<IN1>>,
                                        Tuple2<byte[], StreamRecord<IN2>>>())
                                .createComparator12(comparatorA, comparatorB);

        CoGroupTaskIterator<Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>
                coGroupIterator;
        if (getExecutionConfig().isObjectReuseEnabled()) {
            coGroupIterator =
                    new ReusingSortMergeCoGroupIterator<
                            Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>(
                            iteratorA,
                            iteratorB,
                            keyAndValueSerializerA,
                            comparatorA,
                            keyAndValueSerializerB,
                            comparatorB,
                            pairComparator);
        } else {
            coGroupIterator =
                    new NonReusingSortMergeCoGroupIterator<
                            Tuple2<byte[], StreamRecord<IN1>>, Tuple2<byte[], StreamRecord<IN2>>>(
                            iteratorA,
                            iteratorB,
                            keyAndValueSerializerA,
                            comparatorA,
                            keyAndValueSerializerB,
                            comparatorB,
                            pairComparator);
        }

        coGroupIterator.open();
        TupleUnwrappingIterator<IN1, byte[]> unWrappediteratorA = new TupleUnwrappingIterator<>();
        TupleUnwrappingIterator<IN2, byte[]> unWrappediteratorB = new TupleUnwrappingIterator<>();

        Output<OUT> timestampedCollector = new TimestampedCollector<>(output);
        while (coGroupIterator.next()) {
            unWrappediteratorA.set(coGroupIterator.getValues1().iterator());
            unWrappediteratorB.set(coGroupIterator.getValues2().iterator());
            userFunction.coGroup(unWrappediteratorA, unWrappediteratorB, timestampedCollector);
        }
        coGroupIterator.close();

        Watermark watermark = new Watermark(lastWatermarkTimestamp);
        if (getTimeServiceManager().isPresent()) {
            getTimeServiceManager().get().advanceWatermark(watermark);
        }
        output.emitWatermark(watermark);
    }