private void calculateSplitAssignment()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [401:538]


    private void calculateSplitAssignment(SourceSplitChangeResult sourceSplitChangeResult) {
        Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap;

        // Preliminary calculation of distribution results
        {
            // Allocate valid queues
            if (sourceSplitChangeResult.decreaseSet != null
                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
                partitionId = 0;

                // Re-load balancing
                Set<RocketMQSourceSplit> allMQ = new HashSet<>();
                OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
                        new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
                Map<MessageQueue, Long> stoppingOffsets =
                        stoppingOffsetsSelector.getMessageQueueOffsets(
                                allocatedSet.keySet(), offsetsRetriever);
                Set<MessageQueue> delete =
                        sourceSplitChangeResult.decreaseSet.stream()
                                .map(RocketMQSourceSplit::getMessageQueue)
                                .collect(Collectors.toSet());

                // Calculate all queue
                allMQ.addAll(sourceSplitChangeResult.increaseSet);
                allocatedSet
                        .keySet()
                        .forEach(
                                mq -> {
                                    if (!delete.contains(mq)) {
                                        allMQ.add(
                                                new RocketMQSourceSplit(
                                                        mq,
                                                        checkedOffsets.get(mq),
                                                        stoppingOffsets.getOrDefault(
                                                                mq,
                                                                RocketMQSourceSplit
                                                                        .NO_STOPPING_OFFSET)));
                                    }
                                });
                newSourceSplitAllocateMap =
                        this.allocateStrategy.allocate(
                                allMQ, context.currentParallelism(), partitionId);

                // Update cache
                assignedMap.clear();
                for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
                    assignedMap.put(
                            (Integer) entry.getKey(),
                            ((Set<RocketMQSourceSplit>) entry.getValue())
                                    .stream()
                                            .map(RocketMQSourceSplit::clone)
                                            .collect(Collectors.toSet()));
                }
                partitionId = allMQ.size();
            } else {
                newSourceSplitAllocateMap =
                        this.allocateStrategy.allocate(
                                sourceSplitChangeResult.getIncreaseSet(),
                                context.currentParallelism(),
                                partitionId);

                // Update cache
                newSourceSplitAllocateMap.forEach(
                        (k, v) ->
                                v.forEach(
                                        mq ->
                                                assignedMap
                                                        .computeIfAbsent(k, r -> new HashSet<>())
                                                        .add(mq)));
                partitionId += sourceSplitChangeResult.getIncreaseSet().size();
            }

            // Allocate deleted queues
            if (sourceSplitChangeResult.decreaseSet != null
                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
                sourceSplitChangeResult.decreaseSet.forEach(
                        mq -> {
                            newSourceSplitAllocateMap
                                    .computeIfAbsent(
                                            reflectedQueueToTaskId.get(mq.getMessageQueue()),
                                            k -> new HashSet<>())
                                    .add(mq);
                            reflectedQueueToTaskId.remove(mq.getMessageQueue());
                        });
            }
        }

        {
            // Calculate the result after queue migration
            if (sourceSplitChangeResult.decreaseSet != null
                    && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
                Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new HashMap<>();
                Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new HashMap<>();
                for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
                    int taskId = (int) entry.getKey();
                    Set<RocketMQSourceSplit> splits = (Set<RocketMQSourceSplit>) entry.getValue();
                    for (RocketMQSourceSplit split : splits) {
                        if (!split.getIsIncrease()) {
                            continue;
                        }
                        if (taskId != reflectedQueueToTaskId.get(split.getMessageQueue())) {
                            migrationQueue
                                    .computeIfAbsent(
                                            reflectedQueueToTaskId.get(split.getMessageQueue()),
                                            k -> new HashSet<>())
                                    .add(
                                            new RocketMQSourceSplit(
                                                    split.getMessageQueue(),
                                                    split.getStartingOffset(),
                                                    split.getStoppingOffset(),
                                                    false));
                        } else {
                            noMigrationQueue
                                    .computeIfAbsent(taskId, k -> new HashSet<>())
                                    .add(split);
                        }
                    }
                }

                // finally result
                migrationQueue.forEach(
                        (taskId, splits) -> {
                            newSourceSplitAllocateMap.get(taskId).addAll(splits);
                        });
                noMigrationQueue.forEach(
                        (taskId, splits) -> {
                            newSourceSplitAllocateMap.get(taskId).removeAll(splits);
                        });
            }
        }

        for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
                newSourceSplitAllocateMap.entrySet()) {
            this.pendingSplitAssignmentMap
                    .computeIfAbsent(entry.getKey(), r -> new HashSet<>())
                    .addAll(entry.getValue());
        }
    }