in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java [234:258]
List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context context) {
DataType physicalRowDataType = context.getPhysicalRowDataType();
int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
if (primaryKeyIndexes.length != 0) {
DataType pkDataType = Projection.of(primaryKeyIndexes).project(physicalRowDataType);
ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
}
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
return Arrays.stream(primaryKeyIndexes)
.mapToObj(
index -> {
Optional<Column> column = resolvedSchema.getColumn(index);
if (!column.isPresent()) {
throw new IllegalStateException(
String.format(
"No primary key column found with index '%s'.",
index));
}
LogicalType logicalType = column.get().getDataType().getLogicalType();
return new LogicalTypeWithIndex(index, logicalType);
})
.collect(Collectors.toList());
}