public void flatMap()

in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java [105:151]


        public void flatMap(Row value, Collector<Row> out) {
            Row result = RowUtils.cloneWithReservedFields(value, inputCols.length);

            for (int i = 0; i < inputCols.length; i++) {
                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
                Double[] splits = splitsArray[i];
                boolean isInvalid = false;

                if (!Double.isNaN(feature)) {
                    double index = Arrays.binarySearch(splits, feature);
                    if (index >= 0) {
                        if (index == splits.length - 1) {
                            index--;
                        }
                        result.setField(i + value.getArity(), index);
                    } else {
                        index = -index - 1;
                        if (index == 0 || index == splits.length) {
                            isInvalid = true;
                        } else {
                            result.setField(i + value.getArity(), index - 1);
                        }
                    }
                } else {
                    isInvalid = true;
                }

                if (isInvalid) {
                    switch (handleInvalid) {
                        case ERROR_INVALID:
                            throw new RuntimeException(
                                    "The input contains invalid value. See "
                                            + HANDLE_INVALID
                                            + " parameter for more options.");
                        case SKIP_INVALID:
                            return;
                        case KEEP_INVALID:
                            result.setField(i + value.getArity(), (double) splits.length - 1);
                            break;
                        default:
                            throw new UnsupportedOperationException(
                                    "Unsupported " + HANDLE_INVALID + " type: " + handleInvalid);
                    }
                }
            }
            out.collect(result);
        }