in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java [269:320]
public JdbcSource<OUT> build() {
this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
if (resultSetFetchSize > 0) {
this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize);
}
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
Preconditions.checkArgument(
this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
|| this.resultSetType == ResultSet.CONCUR_READ_ONLY,
"The 'resultSetType' must be ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s",
DeliveryGuarantee.EXACTLY_ONCE);
}
this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, resultSetConcurrency);
this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, resultSetType);
this.configuration.set(
JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);
Preconditions.checkState(
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
if (Objects.nonNull(continuousUnBoundingSettings)) {
Preconditions.checkArgument(
Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider
instanceof JdbcSlideTimingParameterProvider,
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
}
if (Objects.nonNull(jdbcParameterValuesProvider)
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
Preconditions.checkArgument(
Objects.nonNull(continuousUnBoundingSettings),
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
}
return new JdbcSource<>(
configuration,
connectionProvider,
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
.setSqlTemplate(sql)
.setParameterValuesProvider(jdbcParameterValuesProvider),
resultExtractor,
typeInformation,
deliveryGuarantee,
continuousUnBoundingSettings);
}