in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java [239:355]
private void balance(
Map<String, Map<String, List<Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
BrokerRunManager brokerRunManager,
List<String> groupSet,
Map<String, Map<String, Map<String, Partition>>> oldClusterState,
Map<String, RebProcessInfo> rejGroupClientInfoMap) {
// according to group
for (String group : groupSet) {
ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
continue;
}
// filter consumer which don't need to handle
List<ConsumerInfo> consumerList = new ArrayList<>();
List<ConsumerInfo> consumerList1 = consumeGroupInfo.getConsumerInfoList();
RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
if (rebProcessInfo != null) {
for (ConsumerInfo consumerInfo : consumerList1) {
if (consumerInfo == null) {
continue;
}
if (rebProcessInfo.needProcessList.contains(consumerInfo.getConsumerId())
|| rebProcessInfo.needEscapeList.contains(consumerInfo.getConsumerId())) {
Map<String, List<Partition>> partitions2 =
clusterState.computeIfAbsent(
consumerInfo.getConsumerId(), k -> new HashMap<>());
Map<String, Map<String, Partition>> relation =
oldClusterState.get(consumerInfo.getConsumerId());
if (relation != null) {
for (String topic : relation.keySet()) {
partitions2.put(topic, new ArrayList<Partition>());
}
}
continue;
}
consumerList.add(consumerInfo);
}
} else {
consumerList = consumerList1;
}
if (CollectionUtils.isEmpty(consumerList)) {
continue;
}
// sort consumer and partitions, then mod
Set<String> topics = consumeGroupInfo.getTopicSet();
Map<String, Partition> psPartMap =
brokerRunManager.getSubBrokerAcceptSubParts(topics);
int min = psPartMap.size() / consumerList.size();
int max = psPartMap.size() % consumerList.size() == 0 ? min : min + 1;
int serverNumToLoadMax = psPartMap.size() % consumerList.size();
Queue<Partition> partitionToMove = new LinkedBlockingQueue<>();
Map<String, Integer> serverToTake = new HashMap<>();
for (ConsumerInfo consumer : consumerList) {
Map<String, List<Partition>> partitions =
clusterState.get(consumer.getConsumerId());
if (partitions == null) {
partitions = new HashMap<>();
}
int load = 0;
for (List<Partition> entry : partitions.values()) {
load += entry.size();
}
if (load < max) {
if (load == 0) {
serverToTake.put(consumer.getConsumerId(), max - load);
} else if (load < min) {
serverToTake.put(consumer.getConsumerId(), max - load);
}
continue;
}
int numToOffload;
if (serverNumToLoadMax > 0) {
serverNumToLoadMax--;
numToOffload = load - max;
} else {
numToOffload = load - min;
}
// calculate if current consumer partition need to release or add
for (List<Partition> entry : partitions.values()) {
if (entry.size() > numToOffload) {
int condition = numToOffload;
for (int i = 0; i < condition; i++) {
partitionToMove.add(entry.remove(0));
numToOffload--;
}
if (numToOffload <= 0) {
break;
}
} else {
numToOffload -= entry.size();
partitionToMove.addAll(entry);
entry.clear();
if (numToOffload <= 0) {
break;
}
}
}
}
// random allocate the rest partition
for (Entry<String, Integer> entry : serverToTake.entrySet()) {
for (int i = 0; i < entry.getValue() && partitionToMove.size() > 0; i++) {
Partition partition = partitionToMove.poll();
assign(partition, clusterState, entry.getKey());
}
}
// load balance partition between consumer
if (!partitionToMove.isEmpty()) {
for (String consumerId : serverToTake.keySet()) {
if (partitionToMove.isEmpty()) {
break;
}
assign(partitionToMove.poll(), clusterState, consumerId);
}
}
}
}