in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexerModel.java [151:206]
public void flatMap(Row input, Collector<Row> out) {
if (modelDataMap == null) {
modelDataMap = new HashMap[inputCols.length];
StringIndexerModelData modelData =
(StringIndexerModelData)
getRuntimeContext().getBroadcastVariable(broadcastModelKey).get(0);
String[][] stringsArray = modelData.stringArrays;
for (int i = 0; i < stringsArray.length; i++) {
double idx = 0.0;
modelDataMap[i] = new HashMap<>(stringsArray[i].length);
for (String string : stringsArray[i]) {
modelDataMap[i].put(string, idx++);
}
}
}
Row result = RowUtils.cloneWithReservedFields(input, inputCols.length);
for (int i = 0; i < inputCols.length; i++) {
Object objVal = input.getField(inputCols[i]);
String stringVal;
if (null == objVal) {
stringVal = null;
} else if (objVal instanceof String) {
stringVal = (String) objVal;
} else if (objVal instanceof Number) {
stringVal = String.valueOf(objVal);
} else {
throw new RuntimeException(
"The input column only supports string and numeric type.");
}
if (modelDataMap[i].containsKey(stringVal)) {
result.setField(i + input.getArity(), modelDataMap[i].get(stringVal));
} else {
switch (handleInValid) {
case SKIP_INVALID:
return;
case ERROR_INVALID:
throw new RuntimeException(
"The input contains unseen string: "
+ stringVal
+ ". See "
+ HANDLE_INVALID
+ " parameter for more options.");
case KEEP_INVALID:
result.setField(i + input.getArity(), (double) modelDataMap[i].size());
break;
default:
throw new UnsupportedOperationException(
"Unsupported " + HANDLE_INVALID + "type: " + handleInValid);
}
}
}
out.collect(result);
}