private void balance()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java [239:355]


    private void balance(
            Map<String, Map<String, List<Partition>>> clusterState,
            ConsumerInfoHolder consumerHolder,
            BrokerRunManager brokerRunManager,
            List<String> groupSet,
            Map<String, Map<String, Map<String, Partition>>> oldClusterState,
            Map<String, RebProcessInfo> rejGroupClientInfoMap) {
        // according to group
        for (String group : groupSet) {
            ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
            if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
                continue;
            }
            // filter consumer which don't need to handle
            List<ConsumerInfo> consumerList = new ArrayList<>();
            List<ConsumerInfo> consumerList1 = consumeGroupInfo.getConsumerInfoList();
            RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
            if (rebProcessInfo != null) {
                for (ConsumerInfo consumerInfo : consumerList1) {
                    if (consumerInfo == null) {
                        continue;
                    }
                    if (rebProcessInfo.needProcessList.contains(consumerInfo.getConsumerId())
                            || rebProcessInfo.needEscapeList.contains(consumerInfo.getConsumerId())) {
                        Map<String, List<Partition>> partitions2 =
                                clusterState.computeIfAbsent(
                                        consumerInfo.getConsumerId(), k -> new HashMap<>());
                        Map<String, Map<String, Partition>> relation =
                                oldClusterState.get(consumerInfo.getConsumerId());
                        if (relation != null) {
                            for (String topic : relation.keySet()) {
                                partitions2.put(topic, new ArrayList<Partition>());
                            }
                        }
                        continue;
                    }
                    consumerList.add(consumerInfo);
                }
            } else {
                consumerList = consumerList1;
            }
            if (CollectionUtils.isEmpty(consumerList)) {
                continue;
            }
            // sort consumer and partitions, then mod
            Set<String> topics = consumeGroupInfo.getTopicSet();
            Map<String, Partition> psPartMap =
                    brokerRunManager.getSubBrokerAcceptSubParts(topics);
            int min = psPartMap.size() / consumerList.size();
            int max = psPartMap.size() % consumerList.size() == 0 ? min : min + 1;
            int serverNumToLoadMax = psPartMap.size() % consumerList.size();
            Queue<Partition> partitionToMove = new LinkedBlockingQueue<>();
            Map<String, Integer> serverToTake = new HashMap<>();
            for (ConsumerInfo consumer : consumerList) {
                Map<String, List<Partition>> partitions =
                        clusterState.get(consumer.getConsumerId());
                if (partitions == null) {
                    partitions = new HashMap<>();
                }
                int load = 0;
                for (List<Partition> entry : partitions.values()) {
                    load += entry.size();
                }
                if (load < max) {
                    if (load == 0) {
                        serverToTake.put(consumer.getConsumerId(), max - load);
                    } else if (load < min) {
                        serverToTake.put(consumer.getConsumerId(), max - load);
                    }
                    continue;
                }
                int numToOffload;
                if (serverNumToLoadMax > 0) {
                    serverNumToLoadMax--;
                    numToOffload = load - max;
                } else {
                    numToOffload = load - min;
                }
                // calculate if current consumer partition need to release or add
                for (List<Partition> entry : partitions.values()) {
                    if (entry.size() > numToOffload) {
                        int condition = numToOffload;
                        for (int i = 0; i < condition; i++) {
                            partitionToMove.add(entry.remove(0));
                            numToOffload--;
                        }
                        if (numToOffload <= 0) {
                            break;
                        }
                    } else {
                        numToOffload -= entry.size();
                        partitionToMove.addAll(entry);
                        entry.clear();
                        if (numToOffload <= 0) {
                            break;
                        }
                    }
                }
            }
            // random allocate the rest partition
            for (Entry<String, Integer> entry : serverToTake.entrySet()) {
                for (int i = 0; i < entry.getValue() && partitionToMove.size() > 0; i++) {
                    Partition partition = partitionToMove.poll();
                    assign(partition, clusterState, entry.getKey());
                }
            }
            // load balance partition between consumer
            if (!partitionToMove.isEmpty()) {
                for (String consumerId : serverToTake.keySet()) {
                    if (partitionToMove.isEmpty()) {
                        break;
                    }
                    assign(partitionToMove.poll(), clusterState, consumerId);
                }
            }
        }
    }