public RocketMQSourceEnumerator()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [126:165]


    public RocketMQSourceEnumerator(
            OffsetsSelector startingOffsetsSelector,
            OffsetsSelector stoppingOffsetsSelector,
            Boundedness boundedness,
            Configuration configuration,
            SplitEnumeratorContext<RocketMQSourceSplit> context,
            Set<MessageQueue> currentSplitAssignment) {
        this.configuration = configuration;
        this.context = context;
        this.boundedness = boundedness;
        this.lock = new SpinLock();

        // Support allocate splits to reader
        this.checkedOffsets = new ConcurrentHashMap<>();
        this.reflectedQueueToTaskId = new ConcurrentHashMap<>();
        this.pendingSplitAssignmentMap = new ConcurrentHashMap<>();
        this.allocatedSet = new ConcurrentHashMap<>();
        this.assignedMap = new ConcurrentHashMap<>();
        this.allocateStrategy =
                AllocateStrategyFactory.getStrategy(
                        configuration,
                        context,
                        new RocketMQSourceEnumState(currentSplitAssignment));

        // For rocketmq setting
        this.groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
        this.startingOffsetsSelector = startingOffsetsSelector;
        this.stoppingOffsetsSelector = stoppingOffsetsSelector;
        this.partitionDiscoveryIntervalMs =
                configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        // Initialize the task status
        log.info(
                "Starting the RocketMQSourceEnumerator with current split assignment: {}",
                currentSplitAssignment);
        if (!currentSplitAssignment.isEmpty()) {
            this.initTask = new boolean[context.currentParallelism()];
        }
    }