private void handleInitAssignEvent()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [588:618]


    private void handleInitAssignEvent(int taskId, SourceInitAssignEvent initAssignEvent) {
        if (this.initTask == null || this.initTask[taskId]) {
            return;
        }
        lock.lock();
        try {
            // sync assign result
            if (initAssignEvent.getSplits() != null && !initAssignEvent.getSplits().isEmpty()) {
                log.info(
                        "Received SourceInitAssignEvent from reader {} with {} splits.",
                        taskId,
                        initAssignEvent.getSplits().toString());
                initAssignEvent
                        .getSplits()
                        .forEach(
                                split -> {
                                    this.assignedMap
                                            .computeIfAbsent(taskId, r -> new HashSet<>())
                                            .add(split);
                                    this.checkedOffsets.put(
                                            split.getMessageQueue(), split.getStoppingOffset());
                                    this.reflectedQueueToTaskId.put(
                                            split.getMessageQueue(), taskId);
                                    this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
                                });
            }
            this.initTask[taskId] = true;
        } finally {
            lock.unlock();
        }
    }