in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java [84:110]
public static Function<RowData, String> createKeyExtractor(
TableSchema schema, String keyDelimiter) {
return schema.getPrimaryKey()
.map(
key -> {
Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
List<TableColumn> tableColumns = schema.getTableColumns();
for (int i = 0; i < schema.getFieldCount(); i++) {
TableColumn column = tableColumns.get(i);
namesToColumns.put(
column.getName(), new ColumnWithIndex(column, i));
}
FieldFormatter[] fieldFormatters =
key.getColumns().stream()
.map(namesToColumns::get)
.map(
column ->
toFormatter(
column.index, column.getType()))
.toArray(FieldFormatter[]::new);
return (Function<RowData, String>)
new KeyExtractor(fieldFormatters, keyDelimiter);
})
.orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
}