public void endInput()

in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/univariatefeatureselector/UnivariateFeatureSelector.java [202:262]


        public void endInput() {
            List<Integer> indices = new ArrayList<>();

            switch (selectionMode) {
                case NUM_TOP_FEATURES:
                    pValuesAndIndices.sort(
                            Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
                                    .thenComparingInt(t -> t.f1));
                    IntStream.range(0, Math.min(pValuesAndIndices.size(), (int) threshold))
                            .forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
                    break;
                case PERCENTILE:
                    pValuesAndIndices.sort(
                            Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
                                    .thenComparingInt(t -> t.f1));
                    IntStream.range(
                                    0,
                                    Math.min(
                                            pValuesAndIndices.size(),
                                            (int) (pValuesAndIndices.size() * threshold)))
                            .forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
                    break;
                case FPR:
                    pValuesAndIndices.stream()
                            .filter(x -> x.f0 < threshold)
                            .forEach(x -> indices.add(x.f1));
                    break;
                case FDR:
                    pValuesAndIndices.sort(
                            Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
                                    .thenComparingInt(t -> t.f1));

                    int maxIndex = -1;
                    for (int i = 0; i < pValuesAndIndices.size(); i++) {
                        if (pValuesAndIndices.get(i).f0
                                < (threshold / pValuesAndIndices.size()) * (i + 1)) {
                            maxIndex = Math.max(maxIndex, i);
                        }
                    }
                    if (maxIndex >= 0) {
                        pValuesAndIndices.sort(
                                Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
                                        .thenComparingInt(t -> t.f1));
                        IntStream.range(0, maxIndex + 1)
                                .forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
                    }
                    break;
                case FWE:
                    pValuesAndIndices.stream()
                            .filter(x -> x.f0 < threshold / pValuesAndIndices.size())
                            .forEach(x -> indices.add(x.f1));
                    break;
                default:
                    throw new RuntimeException("Unknown Selection Mode: " + selectionMode);
            }

            UnivariateFeatureSelectorModelData modelData =
                    new UnivariateFeatureSelectorModelData(
                            indices.stream().mapToInt(Integer::intValue).toArray());
            output.collect(new StreamRecord<>(modelData));
        }