in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [588:618]
private void handleInitAssignEvent(int taskId, SourceInitAssignEvent initAssignEvent) {
if (this.initTask == null || this.initTask[taskId]) {
return;
}
lock.lock();
try {
// sync assign result
if (initAssignEvent.getSplits() != null && !initAssignEvent.getSplits().isEmpty()) {
log.info(
"Received SourceInitAssignEvent from reader {} with {} splits.",
taskId,
initAssignEvent.getSplits().toString());
initAssignEvent
.getSplits()
.forEach(
split -> {
this.assignedMap
.computeIfAbsent(taskId, r -> new HashSet<>())
.add(split);
this.checkedOffsets.put(
split.getMessageQueue(), split.getStoppingOffset());
this.reflectedQueueToTaskId.put(
split.getMessageQueue(), taskId);
this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
});
}
this.initTask[taskId] = true;
} finally {
lock.unlock();
}
}