public JdbcSource build()

in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java [269:320]


    public JdbcSource<OUT> build() {
        this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
        if (resultSetFetchSize > 0) {
            this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize);
        }

        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            Preconditions.checkArgument(
                    this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
                            || this.resultSetType == ResultSet.CONCUR_READ_ONLY,
                    "The 'resultSetType' must be ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s",
                    DeliveryGuarantee.EXACTLY_ONCE);
        }

        this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, resultSetConcurrency);
        this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, resultSetType);
        this.configuration.set(
                JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
        this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);

        Preconditions.checkState(
                !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
        Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
        Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");

        if (Objects.nonNull(continuousUnBoundingSettings)) {
            Preconditions.checkArgument(
                    Objects.nonNull(jdbcParameterValuesProvider)
                            && jdbcParameterValuesProvider
                                    instanceof JdbcSlideTimingParameterProvider,
                    INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
        }

        if (Objects.nonNull(jdbcParameterValuesProvider)
                && jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
            Preconditions.checkArgument(
                    Objects.nonNull(continuousUnBoundingSettings),
                    INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
        }

        return new JdbcSource<>(
                configuration,
                connectionProvider,
                new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
                        .setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
                        .setSqlTemplate(sql)
                        .setParameterValuesProvider(jdbcParameterValuesProvider),
                resultExtractor,
                typeInformation,
                deliveryGuarantee,
                continuousUnBoundingSettings);
    }