private Tuple2 computeVectorSizeAndNnz()

in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java [128:179]


        private Tuple2<Integer, Integer> computeVectorSizeAndNnz(Row value) {
            int vectorSize = 0;
            int nnz = 0;
            for (int i = 0; i < inputCols.length; ++i) {
                Object object = value.getField(inputCols[i]);
                if (object != null) {
                    if (object instanceof Number) {
                        checkSize(inputSizes[i], 1);
                        if (Double.isNaN(((Number) object).doubleValue()) && !keepInvalid) {
                            throw new RuntimeException(
                                    "Encountered NaN while assembling a row with handleInvalid = 'error'. Consider "
                                            + "removing NaNs from dataset or using handleInvalid = 'keep' or 'skip'.");
                        }
                        vectorSize += 1;
                        nnz += 1;
                    } else if (object instanceof SparseVector) {
                        int localSize = ((SparseVector) object).size();
                        checkSize(inputSizes[i], localSize);
                        nnz += ((SparseVector) object).indices.length;
                        vectorSize += localSize;
                    } else if (object instanceof DenseVector) {
                        int localSize = ((DenseVector) object).size();
                        checkSize(inputSizes[i], localSize);
                        vectorSize += localSize;
                        nnz += ((DenseVector) object).size();
                    } else {
                        throw new IllegalArgumentException(
                                String.format(
                                        "Input type %s has not been supported yet. Only Vector and Number types are supported.",
                                        object.getClass()));
                    }
                } else {
                    vectorSize += inputSizes[i];
                    nnz += inputSizes[i];
                    if (keepInvalid) {
                        if (inputSizes[i] > 1) {
                            DenseVector tmpVec = new DenseVector(inputSizes[i]);
                            for (int j = 0; j < inputSizes[i]; ++j) {
                                tmpVec.values[j] = Double.NaN;
                            }
                            value.setField(inputCols[i], tmpVec);
                        } else {
                            value.setField(inputCols[i], Double.NaN);
                        }
                    } else {
                        throw new RuntimeException(
                                "Input column value is null. Please check the input data or using handleInvalid = 'keep'.");
                    }
                }
            }
            return Tuple2.of(vectorSize, nnz);
        }