in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java [152:170]
private void assignSplitsForBounded(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
return;
}
if (LOG.isInfoEnabled()) {
final String hostInfo =
hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
LOG.info("Subtask {} {} is requesting a Jdbc source split", subtask, hostInfo);
}
final Optional<JdbcSourceSplit> nextSplit = getNextSplit();
if (nextSplit.isPresent()) {
final JdbcSourceSplit split = nextSplit.get();
context.assignSplit(split, subtask);
LOG.info("Assigned split to subtask {} : {}", subtask, split);
} else {
context.signalNoMoreSplits(subtask);
LOG.info("No more splits available for subtask {}", subtask);
}
}