public JdbcOutputFormat build()

in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java [89:127]


    public JdbcOutputFormat<RowData, ?, ?> build() {
        checkNotNull(jdbcOptions, "jdbc options can not be null");
        checkNotNull(dmlOptions, "jdbc dml options can not be null");
        checkNotNull(executionOptions, "jdbc execution options can not be null");

        final LogicalType[] logicalTypes =
                Arrays.stream(fieldDataTypes)
                        .map(DataType::getLogicalType)
                        .toArray(LogicalType[]::new);
        if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
            // upsert query
            return new JdbcOutputFormat<>(
                    new SimpleJdbcConnectionProvider(jdbcOptions),
                    executionOptions,
                    ctx ->
                            createBufferReduceExecutor(
                                    dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
                    JdbcOutputFormat.RecordExtractor.identity());
        } else {
            // append only query
            final String sql =
                    dmlOptions
                            .getDialect()
                            .getInsertIntoStatement(
                                    dmlOptions.getTableName(), dmlOptions.getFieldNames());
            return new JdbcOutputFormat<>(
                    new SimpleJdbcConnectionProvider(jdbcOptions),
                    executionOptions,
                    ctx ->
                            createSimpleBufferedExecutor(
                                    ctx,
                                    dmlOptions.getDialect(),
                                    dmlOptions.getFieldNames(),
                                    logicalTypes,
                                    sql,
                                    rowDataTypeInformation),
                    JdbcOutputFormat.RecordExtractor.identity());
        }
    }