public void flatMap()

in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/StringIndexerModel.java [151:206]


        public void flatMap(Row input, Collector<Row> out) {
            if (modelDataMap == null) {
                modelDataMap = new HashMap[inputCols.length];
                StringIndexerModelData modelData =
                        (StringIndexerModelData)
                                getRuntimeContext().getBroadcastVariable(broadcastModelKey).get(0);
                String[][] stringsArray = modelData.stringArrays;
                for (int i = 0; i < stringsArray.length; i++) {
                    double idx = 0.0;
                    modelDataMap[i] = new HashMap<>(stringsArray[i].length);
                    for (String string : stringsArray[i]) {
                        modelDataMap[i].put(string, idx++);
                    }
                }
            }

            Row result = RowUtils.cloneWithReservedFields(input, inputCols.length);
            for (int i = 0; i < inputCols.length; i++) {
                Object objVal = input.getField(inputCols[i]);
                String stringVal;
                if (null == objVal) {
                    stringVal = null;
                } else if (objVal instanceof String) {
                    stringVal = (String) objVal;
                } else if (objVal instanceof Number) {
                    stringVal = String.valueOf(objVal);
                } else {
                    throw new RuntimeException(
                            "The input column only supports string and numeric type.");
                }

                if (modelDataMap[i].containsKey(stringVal)) {
                    result.setField(i + input.getArity(), modelDataMap[i].get(stringVal));
                } else {
                    switch (handleInValid) {
                        case SKIP_INVALID:
                            return;
                        case ERROR_INVALID:
                            throw new RuntimeException(
                                    "The input contains unseen string: "
                                            + stringVal
                                            + ". See "
                                            + HANDLE_INVALID
                                            + " parameter for more options.");
                        case KEEP_INVALID:
                            result.setField(i + input.getArity(), (double) modelDataMap[i].size());
                            break;
                        default:
                            throw new UnsupportedOperationException(
                                    "Unsupported " + HANDLE_INVALID + "type: " + handleInValid);
                    }
                }
            }

            out.collect(result);
        }