in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java [152:200]
private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(
JdbcDmlOptions opt, RuntimeContext ctx) {
checkArgument(opt.getKeyFields().isPresent());
int[] pkFields =
Arrays.stream(opt.getKeyFields().get())
.mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
.toArray();
int[] pkTypes =
opt.getFieldTypes() == null
? null
: Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray();
return opt.getDialect()
.getUpsertStatement(
opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
.map(
sql ->
createSimpleRowExecutor(
parseNamedStatement(sql),
opt.getFieldTypes(),
ctx.getExecutionConfig().isObjectReuseEnabled()))
.orElseGet(
() ->
new InsertOrUpdateJdbcExecutor<>(
parseNamedStatement(
opt.getDialect()
.getRowExistsStatement(
opt.getTableName(),
opt.getKeyFields().get())),
parseNamedStatement(
opt.getDialect()
.getInsertIntoStatement(
opt.getTableName(),
opt.getFieldNames())),
parseNamedStatement(
opt.getDialect()
.getUpdateStatement(
opt.getTableName(),
opt.getFieldNames(),
opt.getKeyFields().get())),
createRowJdbcStatementBuilder(pkTypes),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowJdbcStatementBuilder(opt.getFieldTypes()),
createRowKeyExtractor(pkFields),
ctx.getExecutionConfig().isObjectReuseEnabled()
? Row::copy
: Function.identity()));
}