in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java [114:138]
private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor(
JdbcDmlOptions opt, LogicalType[] fieldTypes) {
checkArgument(opt.getKeyFields().isPresent());
JdbcDialect dialect = opt.getDialect();
String tableName = opt.getTableName();
String[] pkNames = opt.getKeyFields().get();
int[] pkFields =
Arrays.stream(pkNames)
.mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
.toArray();
LogicalType[] pkTypes =
Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
return new TableBufferReducedStatementExecutor(
createUpsertRowExecutor(
dialect,
tableName,
opt.getFieldNames(),
fieldTypes,
pkFields,
pkNames,
pkTypes),
createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
createRowKeyExtractor(fieldTypes, pkFields));
}