in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [401:538]
private void calculateSplitAssignment(SourceSplitChangeResult sourceSplitChangeResult) {
Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap;
// Preliminary calculation of distribution results
{
// Allocate valid queues
if (sourceSplitChangeResult.decreaseSet != null
&& !sourceSplitChangeResult.decreaseSet.isEmpty()) {
partitionId = 0;
// Re-load balancing
Set<RocketMQSourceSplit> allMQ = new HashSet<>();
OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
Map<MessageQueue, Long> stoppingOffsets =
stoppingOffsetsSelector.getMessageQueueOffsets(
allocatedSet.keySet(), offsetsRetriever);
Set<MessageQueue> delete =
sourceSplitChangeResult.decreaseSet.stream()
.map(RocketMQSourceSplit::getMessageQueue)
.collect(Collectors.toSet());
// Calculate all queue
allMQ.addAll(sourceSplitChangeResult.increaseSet);
allocatedSet
.keySet()
.forEach(
mq -> {
if (!delete.contains(mq)) {
allMQ.add(
new RocketMQSourceSplit(
mq,
checkedOffsets.get(mq),
stoppingOffsets.getOrDefault(
mq,
RocketMQSourceSplit
.NO_STOPPING_OFFSET)));
}
});
newSourceSplitAllocateMap =
this.allocateStrategy.allocate(
allMQ, context.currentParallelism(), partitionId);
// Update cache
assignedMap.clear();
for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
assignedMap.put(
(Integer) entry.getKey(),
((Set<RocketMQSourceSplit>) entry.getValue())
.stream()
.map(RocketMQSourceSplit::clone)
.collect(Collectors.toSet()));
}
partitionId = allMQ.size();
} else {
newSourceSplitAllocateMap =
this.allocateStrategy.allocate(
sourceSplitChangeResult.getIncreaseSet(),
context.currentParallelism(),
partitionId);
// Update cache
newSourceSplitAllocateMap.forEach(
(k, v) ->
v.forEach(
mq ->
assignedMap
.computeIfAbsent(k, r -> new HashSet<>())
.add(mq)));
partitionId += sourceSplitChangeResult.getIncreaseSet().size();
}
// Allocate deleted queues
if (sourceSplitChangeResult.decreaseSet != null
&& !sourceSplitChangeResult.decreaseSet.isEmpty()) {
sourceSplitChangeResult.decreaseSet.forEach(
mq -> {
newSourceSplitAllocateMap
.computeIfAbsent(
reflectedQueueToTaskId.get(mq.getMessageQueue()),
k -> new HashSet<>())
.add(mq);
reflectedQueueToTaskId.remove(mq.getMessageQueue());
});
}
}
{
// Calculate the result after queue migration
if (sourceSplitChangeResult.decreaseSet != null
&& !sourceSplitChangeResult.decreaseSet.isEmpty()) {
Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new HashMap<>();
Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new HashMap<>();
for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
int taskId = (int) entry.getKey();
Set<RocketMQSourceSplit> splits = (Set<RocketMQSourceSplit>) entry.getValue();
for (RocketMQSourceSplit split : splits) {
if (!split.getIsIncrease()) {
continue;
}
if (taskId != reflectedQueueToTaskId.get(split.getMessageQueue())) {
migrationQueue
.computeIfAbsent(
reflectedQueueToTaskId.get(split.getMessageQueue()),
k -> new HashSet<>())
.add(
new RocketMQSourceSplit(
split.getMessageQueue(),
split.getStartingOffset(),
split.getStoppingOffset(),
false));
} else {
noMigrationQueue
.computeIfAbsent(taskId, k -> new HashSet<>())
.add(split);
}
}
}
// finally result
migrationQueue.forEach(
(taskId, splits) -> {
newSourceSplitAllocateMap.get(taskId).addAll(splits);
});
noMigrationQueue.forEach(
(taskId, splits) -> {
newSourceSplitAllocateMap.get(taskId).removeAll(splits);
});
}
}
for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
newSourceSplitAllocateMap.entrySet()) {
this.pendingSplitAssignmentMap
.computeIfAbsent(entry.getKey(), r -> new HashSet<>())
.addAll(entry.getValue());
}
}