in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java [105:151]
public void flatMap(Row value, Collector<Row> out) {
Row result = RowUtils.cloneWithReservedFields(value, inputCols.length);
for (int i = 0; i < inputCols.length; i++) {
double feature = ((Number) value.getField(inputCols[i])).doubleValue();
Double[] splits = splitsArray[i];
boolean isInvalid = false;
if (!Double.isNaN(feature)) {
double index = Arrays.binarySearch(splits, feature);
if (index >= 0) {
if (index == splits.length - 1) {
index--;
}
result.setField(i + value.getArity(), index);
} else {
index = -index - 1;
if (index == 0 || index == splits.length) {
isInvalid = true;
} else {
result.setField(i + value.getArity(), index - 1);
}
}
} else {
isInvalid = true;
}
if (isInvalid) {
switch (handleInvalid) {
case ERROR_INVALID:
throw new RuntimeException(
"The input contains invalid value. See "
+ HANDLE_INVALID
+ " parameter for more options.");
case SKIP_INVALID:
return;
case KEEP_INVALID:
result.setField(i + value.getArity(), (double) splits.length - 1);
break;
default:
throw new UnsupportedOperationException(
"Unsupported " + HANDLE_INVALID + " type: " + handleInvalid);
}
}
}
out.collect(result);
}