private void sendSplitChangesToRemote()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [541:586]


    private void sendSplitChangesToRemote(Set<Integer> pendingReaders) {
        Map<Integer, List<RocketMQSourceSplit>> incrementalSplit = new ConcurrentHashMap<>();

        for (Integer pendingReader : pendingReaders) {
            if (!context.registeredReaders().containsKey(pendingReader)) {
                throw new IllegalStateException(
                        String.format(
                                "Reader %d is not registered to source coordinator",
                                pendingReader));
            }

            final Set<RocketMQSourceSplit> pendingAssignmentForReader =
                    this.pendingSplitAssignmentMap.remove(pendingReader);

            // Put pending assignment into incremental assignment
            if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
                incrementalSplit
                        .computeIfAbsent(pendingReader, k -> new ArrayList<>())
                        .addAll(pendingAssignmentForReader);
                pendingAssignmentForReader.forEach(
                        split -> {
                            if (split.getIsIncrease()) {
                                this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
                                reflectedQueueToTaskId.put(split.getMessageQueue(), pendingReader);
                            }
                        });
            }
        }

        // Assign pending splits to readers
        if (!incrementalSplit.isEmpty()) {
            log.info(
                    "Enumerator assigning split(s) to readers {}",
                    JSON.toJSONString(incrementalSplit, false));
            context.assignSplits(new SplitsAssignment<>(incrementalSplit));
        }

        // Sends NoMoreSplitsEvent to the readers if there is no more partition.
        if (partitionDiscoveryIntervalMs <= 0 && this.boundedness == Boundedness.BOUNDED) {
            log.info(
                    "No more rocketmq partition to assign. "
                            + "Sending NoMoreSplitsEvent to the readers in consumer group {}.",
                    groupId);
            pendingReaders.forEach(this.context::signalNoMoreSplits);
        }
    }