in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [312:352]
private void sendSplitChangesToRemote(Set<Integer> pendingReaders) {
Map<Integer, List<RocketMQSourceSplit>> incrementalSplit = new ConcurrentHashMap<>();
for (Integer pendingReader : pendingReaders) {
if (!context.registeredReaders().containsKey(pendingReader)) {
throw new IllegalStateException(
String.format(
"Reader %d is not registered to source coordinator",
pendingReader));
}
final Set<RocketMQSourceSplit> pendingAssignmentForReader =
this.pendingSplitAssignmentMap.get(pendingReader);
// Put pending assignment into incremental assignment
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
incrementalSplit
.computeIfAbsent(pendingReader, k -> new ArrayList<>())
.addAll(pendingAssignmentForReader);
pendingAssignmentForReader.forEach(
split -> this.allocatedSet.add(split.getMessageQueue()));
}
}
// Assign pending splits to readers
if (!incrementalSplit.isEmpty()) {
log.info(
"Enumerator assigning split(s) to readers {}",
JSON.toJSONString(incrementalSplit, false));
context.assignSplits(new SplitsAssignment<>(incrementalSplit));
}
// Sends NoMoreSplitsEvent to the readers if there is no more partition.
if (partitionDiscoveryIntervalMs <= 0 && this.boundedness == Boundedness.BOUNDED) {
log.info(
"No more rocketmq partition to assign. "
+ "Sending NoMoreSplitsEvent to the readers in consumer group {}.",
groupId);
pendingReaders.forEach(this.context::signalNoMoreSplits);
}
}