in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/hashingtf/HashingTF.java [118:158]
public Row map(Row row) throws Exception {
Object inputObj = row.getField(inputCol);
Iterable<Object> inputList;
if (inputObj.getClass().isArray()) {
inputList = Arrays.asList((Object[]) inputObj);
} else if (inputObj instanceof Iterable) {
inputList = (Iterable<Object>) inputObj;
} else {
throw new IllegalArgumentException(
"Input format "
+ inputObj.getClass().getCanonicalName()
+ " is not supported for input column "
+ inputCol
+ ". Supported options are Array and Iterable.");
}
Map<Integer, Integer> map = new HashMap<>();
for (Object obj : inputList) {
int hashValue = hash(obj);
int index = nonNegativeMod(hashValue, numFeatures);
if (map.containsKey(index)) {
if (!binary) {
map.put(index, map.get(index) + 1);
}
} else {
map.put(index, 1);
}
}
// Converts from map to a sparse vector.
int[] indices = new int[map.size()];
double[] values = new double[map.size()];
int idx = 0;
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
indices[idx] = entry.getKey();
values[idx] = entry.getValue();
idx++;
}
return RowUtils.append(row, Vectors.sparse(numFeatures, indices, values));
}