in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java [62:77]
public static SerializableFunction<RowData, String> createKeyExtractor(
List<LogicalTypeWithIndex> primaryKeyTypesWithIndex, String keyDelimiter) {
if (!primaryKeyTypesWithIndex.isEmpty()) {
FieldFormatter[] formatters =
primaryKeyTypesWithIndex.stream()
.map(
logicalTypeWithIndex ->
toFormatter(
logicalTypeWithIndex.index,
logicalTypeWithIndex.logicalType))
.toArray(FieldFormatter[]::new);
return new KeyExtractor(formatters, keyDelimiter);
} else {
return (row) -> null;
}
}