in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java [97:122]
public JdbcSourceSplitReader(
SourceReaderContext context,
Configuration config,
TypeInformation<T> typeInformation,
JdbcConnectionProvider connectionProvider,
DeliveryGuarantee deliveryGuarantee,
ResultExtractor<T> resultExtractor) {
this.context = Preconditions.checkNotNull(context);
this.config = Preconditions.checkNotNull(config);
this.typeInformation = Preconditions.checkNotNull(typeInformation);
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.resultSetType = config.get(RESULTSET_TYPE);
this.resultSetConcurrency = config.get(RESULTSET_CONCURRENCY);
this.resultSetFetchSize = config.get(RESULTSET_FETCH_SIZE);
this.autoCommit = config.get(AUTO_COMMIT);
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
this.splits = new ArrayDeque<>();
this.hasNextRecordCurrentSplit = false;
this.currentSplit = null;
int splitReaderFetchBatchSize = config.get(READER_FETCH_BATCH_SIZE);
Preconditions.checkArgument(
splitReaderFetchBatchSize > 0 && splitReaderFetchBatchSize < Integer.MAX_VALUE);
this.splitReaderFetchBatchSize = splitReaderFetchBatchSize;
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
this.currentSplitOffset = 0;
}