in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java [60:95]
public List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable)
throws RuntimeException {
if (!splitGettable.get()) {
LOG.info(
"The current split is over max splits capacity of {}.",
JdbcSourceEnumerator.class.getSimpleName());
return Collections.emptyList();
}
if (parameterValuesProvider == null) {
return Collections.singletonList(
new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset()));
}
if (optionalSqlSplitEnumeratorState != null) {
parameterValuesProvider.setOptionalState(optionalSqlSplitEnumeratorState);
}
Serializable[][] parameters = parameterValuesProvider.getParameterValues();
// update state
optionalSqlSplitEnumeratorState = parameterValuesProvider.getLatestOptionalState();
if (parameters == null) {
return Collections.singletonList(
new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset()));
}
List<JdbcSourceSplit> jdbcSourceSplitList = new ArrayList<>(parameters.length);
for (Serializable[] paramArr : parameters) {
jdbcSourceSplitList.add(
new JdbcSourceSplit(
getNextId(), sqlTemplate, paramArr, new CheckpointedOffset()));
}
return jdbcSourceSplitList;
}