private static JdbcBatchStatementExecutor createUpsertRowExecutor()

in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java [152:200]


    private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(
            JdbcDmlOptions opt, RuntimeContext ctx) {
        checkArgument(opt.getKeyFields().isPresent());

        int[] pkFields =
                Arrays.stream(opt.getKeyFields().get())
                        .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
                        .toArray();
        int[] pkTypes =
                opt.getFieldTypes() == null
                        ? null
                        : Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray();

        return opt.getDialect()
                .getUpsertStatement(
                        opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
                .map(
                        sql ->
                                createSimpleRowExecutor(
                                        parseNamedStatement(sql),
                                        opt.getFieldTypes(),
                                        ctx.getExecutionConfig().isObjectReuseEnabled()))
                .orElseGet(
                        () ->
                                new InsertOrUpdateJdbcExecutor<>(
                                        parseNamedStatement(
                                                opt.getDialect()
                                                        .getRowExistsStatement(
                                                                opt.getTableName(),
                                                                opt.getKeyFields().get())),
                                        parseNamedStatement(
                                                opt.getDialect()
                                                        .getInsertIntoStatement(
                                                                opt.getTableName(),
                                                                opt.getFieldNames())),
                                        parseNamedStatement(
                                                opt.getDialect()
                                                        .getUpdateStatement(
                                                                opt.getTableName(),
                                                                opt.getFieldNames(),
                                                                opt.getKeyFields().get())),
                                        createRowJdbcStatementBuilder(pkTypes),
                                        createRowJdbcStatementBuilder(opt.getFieldTypes()),
                                        createRowJdbcStatementBuilder(opt.getFieldTypes()),
                                        createRowKeyExtractor(pkFields),
                                        ctx.getExecutionConfig().isObjectReuseEnabled()
                                                ? Row::copy
                                                : Function.identity()));
    }