private static DataStream getBoundaryRange()

in flink-ml-lib/src/main/java/org/apache/flink/ml/evaluation/binaryclassification/BinaryClassificationEvaluator.java [585:644]


    private static DataStream<double[]> getBoundaryRange(
            DataStream<Tuple3<Double, Boolean, Double>> evalData) {
        DataStream<double[]> sampleScoreStream =
                DataStreamUtils.mapPartition(
                        evalData,
                        new MapPartitionFunction<Tuple3<Double, Boolean, Double>, double[]>() {
                            @Override
                            public void mapPartition(
                                    Iterable<Tuple3<Double, Boolean, Double>> dataPoints,
                                    Collector<double[]> out) {
                                List<Double> bufferedDataPoints = new ArrayList<>();
                                for (Tuple3<Double, Boolean, Double> dataPoint : dataPoints) {
                                    bufferedDataPoints.add(dataPoint.f0);
                                }
                                double[] sampleScores = new double[NUM_SAMPLE_FOR_RANGE_PARTITION];
                                Arrays.fill(sampleScores, Double.MAX_VALUE);
                                Random rand = new Random();
                                int sampleNum = bufferedDataPoints.size();
                                if (sampleNum > 0) {
                                    for (int i = 0; i < NUM_SAMPLE_FOR_RANGE_PARTITION; ++i) {
                                        sampleScores[i] =
                                                bufferedDataPoints.get(rand.nextInt(sampleNum));
                                    }
                                }
                                out.collect(sampleScores);
                            }
                        });
        final int parallel = sampleScoreStream.getParallelism();

        DataStream<double[]> boundaryRange =
                DataStreamUtils.mapPartition(
                        sampleScoreStream,
                        new MapPartitionFunction<double[], double[]>() {
                            @Override
                            public void mapPartition(
                                    Iterable<double[]> dataPoints, Collector<double[]> out) {
                                double[] allSampleScore =
                                        new double[parallel * NUM_SAMPLE_FOR_RANGE_PARTITION];
                                int cnt = 0;
                                for (double[] dataPoint : dataPoints) {
                                    System.arraycopy(
                                            dataPoint,
                                            0,
                                            allSampleScore,
                                            cnt * NUM_SAMPLE_FOR_RANGE_PARTITION,
                                            NUM_SAMPLE_FOR_RANGE_PARTITION);
                                    cnt++;
                                }
                                Arrays.sort(allSampleScore);
                                double[] boundaryRange = new double[parallel];
                                for (int i = 0; i < parallel; ++i) {
                                    boundaryRange[i] =
                                            allSampleScore[i * NUM_SAMPLE_FOR_RANGE_PARTITION];
                                }
                                out.collect(boundaryRange);
                            }
                        });
        boundaryRange.getTransformation().setParallelism(1);
        return boundaryRange;
    }