in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java [128:179]
private Tuple2<Integer, Integer> computeVectorSizeAndNnz(Row value) {
int vectorSize = 0;
int nnz = 0;
for (int i = 0; i < inputCols.length; ++i) {
Object object = value.getField(inputCols[i]);
if (object != null) {
if (object instanceof Number) {
checkSize(inputSizes[i], 1);
if (Double.isNaN(((Number) object).doubleValue()) && !keepInvalid) {
throw new RuntimeException(
"Encountered NaN while assembling a row with handleInvalid = 'error'. Consider "
+ "removing NaNs from dataset or using handleInvalid = 'keep' or 'skip'.");
}
vectorSize += 1;
nnz += 1;
} else if (object instanceof SparseVector) {
int localSize = ((SparseVector) object).size();
checkSize(inputSizes[i], localSize);
nnz += ((SparseVector) object).indices.length;
vectorSize += localSize;
} else if (object instanceof DenseVector) {
int localSize = ((DenseVector) object).size();
checkSize(inputSizes[i], localSize);
vectorSize += localSize;
nnz += ((DenseVector) object).size();
} else {
throw new IllegalArgumentException(
String.format(
"Input type %s has not been supported yet. Only Vector and Number types are supported.",
object.getClass()));
}
} else {
vectorSize += inputSizes[i];
nnz += inputSizes[i];
if (keepInvalid) {
if (inputSizes[i] > 1) {
DenseVector tmpVec = new DenseVector(inputSizes[i]);
for (int j = 0; j < inputSizes[i]; ++j) {
tmpVec.values[j] = Double.NaN;
}
value.setField(inputCols[i], tmpVec);
} else {
value.setField(inputCols[i], Double.NaN);
}
} else {
throw new RuntimeException(
"Input column value is null. Please check the input data or using handleInvalid = 'keep'.");
}
}
}
return Tuple2.of(vectorSize, nnz);
}