in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java [151:191]
private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(JdbcDmlOptions opt) {
checkArgument(opt.getKeyFields().isPresent());
int[] pkFields =
Arrays.stream(opt.getKeyFields().get())
.mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
.toArray();
int[] pkTypes =
opt.getFieldTypes() == null
? null
: Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray();
return opt.getDialect()
.getUpsertStatement(
opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
.map(sql -> createSimpleRowExecutor(parseNamedStatement(sql), opt.getFieldTypes()))
.orElseGet(
() ->
new InsertOrUpdateJdbcExecutor<>(
parseNamedStatement(
opt.getDialect()
.getRowExistsStatement(
opt.getTableName(),
opt.getKeyFields().get())),
parseNamedStatement(
opt.getDialect()
.getInsertIntoStatement(
opt.getTableName(),
opt.getFieldNames())),
parseNamedStatement(
opt.getDialect()
.getUpdateStatement(
opt.getTableName(),
opt.getFieldNames(),
opt.getKeyFields().get())),
createRowJdbcStatementBuilder(pkTypes),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowKeyExtractor(pkFields),
Function.identity()));
}