public static Function createKeyExtractor()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java [84:110]


    public static Function<RowData, String> createKeyExtractor(
            TableSchema schema, String keyDelimiter) {
        return schema.getPrimaryKey()
                .map(
                        key -> {
                            Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
                            List<TableColumn> tableColumns = schema.getTableColumns();
                            for (int i = 0; i < schema.getFieldCount(); i++) {
                                TableColumn column = tableColumns.get(i);
                                namesToColumns.put(
                                        column.getName(), new ColumnWithIndex(column, i));
                            }

                            FieldFormatter[] fieldFormatters =
                                    key.getColumns().stream()
                                            .map(namesToColumns::get)
                                            .map(
                                                    column ->
                                                            toFormatter(
                                                                    column.index, column.getType()))
                                            .toArray(FieldFormatter[]::new);

                            return (Function<RowData, String>)
                                    new KeyExtractor(fieldFormatters, keyDelimiter);
                        })
                .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
    }