in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java [181:201]
private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(
JdbcDialect dialect,
String tableName,
String[] fieldNames,
LogicalType[] fieldTypes,
int[] pkFields,
String[] pkNames,
LogicalType[] pkTypes) {
return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
.map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql))
.orElseGet(
() ->
createInsertOrUpdateExecutor(
dialect,
tableName,
fieldNames,
fieldTypes,
pkFields,
pkNames,
pkTypes));
}