in src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RocketMQUtils.java [59:80]
public static List<MessageQueue> allocate(
Collection<MessageQueue> mqSet, int numberOfParallelTasks, int indexOfThisTask) {
ArrayList<MessageQueue> mqAll = new ArrayList<>(mqSet);
Collections.sort(mqAll);
List<MessageQueue> result = new ArrayList<>();
int mod = mqAll.size() % numberOfParallelTasks;
int averageSize =
mqAll.size() <= numberOfParallelTasks
? 1
: (mod > 0 && indexOfThisTask < mod
? mqAll.size() / numberOfParallelTasks + 1
: mqAll.size() / numberOfParallelTasks);
int startIndex =
(mod > 0 && indexOfThisTask < mod)
? indexOfThisTask * averageSize
: indexOfThisTask * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}