in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java [129:163]
private static JdbcBatchStatementExecutor<RowData> createBufferReduceExecutor(
JdbcDmlOptions opt,
RuntimeContext ctx,
TypeInformation<RowData> rowDataTypeInfo,
LogicalType[] fieldTypes) {
checkArgument(opt.getKeyFields().isPresent());
JdbcDialect dialect = opt.getDialect();
String tableName = opt.getTableName();
String[] pkNames = opt.getKeyFields().get();
int[] pkFields =
Arrays.stream(pkNames)
.mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
.toArray();
LogicalType[] pkTypes =
Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
final TypeSerializer<RowData> typeSerializer =
rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
final Function<RowData, RowData> valueTransform =
ctx.getExecutionConfig().isObjectReuseEnabled()
? typeSerializer::copy
: Function.identity();
return new TableBufferReducedStatementExecutor(
createUpsertRowExecutor(
dialect,
tableName,
opt.getFieldNames(),
fieldTypes,
pkFields,
pkNames,
pkTypes),
createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
createRowKeyExtractor(fieldTypes, pkFields),
valueTransform);
}