in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java [128:148]
public void handleSplitsChanges(SplitsChange<MongoSourceSplit> splitsChanges) {
LOG.debug("Handle split changes {}", splitsChanges);
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
MongoSourceSplit sourceSplit = splitsChanges.splits().get(0);
if (!(sourceSplit instanceof MongoScanSourceSplit)) {
throw new UnsupportedOperationException(
String.format(
"The SourceSplit type of %s is not supported.",
sourceSplit.getClass()));
}
this.currentSplit = (MongoScanSourceSplit) sourceSplit;
this.finished = false;
}