in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java [184:209]
private static JdbcBatchStatementExecutor<RowData> createInsertOrUpdateExecutor(
JdbcDialect dialect,
String tableName,
String[] fieldNames,
LogicalType[] fieldTypes,
int[] pkFields,
String[] pkNames,
LogicalType[] pkTypes) {
final String existStmt = dialect.getRowExistsStatement(tableName, pkNames);
final String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
final String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, pkNames);
return new TableInsertOrUpdateStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection, existStmt, pkNames),
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection, insertStmt, fieldNames),
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection, updateStmt, fieldNames),
dialect.getRowConverter(RowType.of(pkTypes)),
dialect.getRowConverter(RowType.of(fieldTypes)),
dialect.getRowConverter(RowType.of(fieldTypes)),
createRowKeyExtractor(fieldTypes, pkFields));
}