in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/univariatefeatureselector/UnivariateFeatureSelector.java [202:262]
public void endInput() {
List<Integer> indices = new ArrayList<>();
switch (selectionMode) {
case NUM_TOP_FEATURES:
pValuesAndIndices.sort(
Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
.thenComparingInt(t -> t.f1));
IntStream.range(0, Math.min(pValuesAndIndices.size(), (int) threshold))
.forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
break;
case PERCENTILE:
pValuesAndIndices.sort(
Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
.thenComparingInt(t -> t.f1));
IntStream.range(
0,
Math.min(
pValuesAndIndices.size(),
(int) (pValuesAndIndices.size() * threshold)))
.forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
break;
case FPR:
pValuesAndIndices.stream()
.filter(x -> x.f0 < threshold)
.forEach(x -> indices.add(x.f1));
break;
case FDR:
pValuesAndIndices.sort(
Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
.thenComparingInt(t -> t.f1));
int maxIndex = -1;
for (int i = 0; i < pValuesAndIndices.size(); i++) {
if (pValuesAndIndices.get(i).f0
< (threshold / pValuesAndIndices.size()) * (i + 1)) {
maxIndex = Math.max(maxIndex, i);
}
}
if (maxIndex >= 0) {
pValuesAndIndices.sort(
Comparator.comparingDouble((Tuple2<Double, Integer> t) -> t.f0)
.thenComparingInt(t -> t.f1));
IntStream.range(0, maxIndex + 1)
.forEach(i -> indices.add(pValuesAndIndices.get(i).f1));
}
break;
case FWE:
pValuesAndIndices.stream()
.filter(x -> x.f0 < threshold / pValuesAndIndices.size())
.forEach(x -> indices.add(x.f1));
break;
default:
throw new RuntimeException("Unknown Selection Mode: " + selectionMode);
}
UnivariateFeatureSelectorModelData modelData =
new UnivariateFeatureSelectorModelData(
indices.stream().mapToInt(Integer::intValue).toArray());
output.collect(new StreamRecord<>(modelData));
}