in flink-ml-lib/src/main/java/org/apache/flink/ml/evaluation/binaryclassification/BinaryClassificationEvaluator.java [585:644]
private static DataStream<double[]> getBoundaryRange(
DataStream<Tuple3<Double, Boolean, Double>> evalData) {
DataStream<double[]> sampleScoreStream =
DataStreamUtils.mapPartition(
evalData,
new MapPartitionFunction<Tuple3<Double, Boolean, Double>, double[]>() {
@Override
public void mapPartition(
Iterable<Tuple3<Double, Boolean, Double>> dataPoints,
Collector<double[]> out) {
List<Double> bufferedDataPoints = new ArrayList<>();
for (Tuple3<Double, Boolean, Double> dataPoint : dataPoints) {
bufferedDataPoints.add(dataPoint.f0);
}
double[] sampleScores = new double[NUM_SAMPLE_FOR_RANGE_PARTITION];
Arrays.fill(sampleScores, Double.MAX_VALUE);
Random rand = new Random();
int sampleNum = bufferedDataPoints.size();
if (sampleNum > 0) {
for (int i = 0; i < NUM_SAMPLE_FOR_RANGE_PARTITION; ++i) {
sampleScores[i] =
bufferedDataPoints.get(rand.nextInt(sampleNum));
}
}
out.collect(sampleScores);
}
});
final int parallel = sampleScoreStream.getParallelism();
DataStream<double[]> boundaryRange =
DataStreamUtils.mapPartition(
sampleScoreStream,
new MapPartitionFunction<double[], double[]>() {
@Override
public void mapPartition(
Iterable<double[]> dataPoints, Collector<double[]> out) {
double[] allSampleScore =
new double[parallel * NUM_SAMPLE_FOR_RANGE_PARTITION];
int cnt = 0;
for (double[] dataPoint : dataPoints) {
System.arraycopy(
dataPoint,
0,
allSampleScore,
cnt * NUM_SAMPLE_FOR_RANGE_PARTITION,
NUM_SAMPLE_FOR_RANGE_PARTITION);
cnt++;
}
Arrays.sort(allSampleScore);
double[] boundaryRange = new double[parallel];
for (int i = 0; i < parallel; ++i) {
boundaryRange[i] =
allSampleScore[i * NUM_SAMPLE_FOR_RANGE_PARTITION];
}
out.collect(boundaryRange);
}
});
boundaryRange.getTransformation().setParallelism(1);
return boundaryRange;
}