in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [126:165]
public RocketMQSourceEnumerator(
OffsetsSelector startingOffsetsSelector,
OffsetsSelector stoppingOffsetsSelector,
Boundedness boundedness,
Configuration configuration,
SplitEnumeratorContext<RocketMQSourceSplit> context,
Set<MessageQueue> currentSplitAssignment) {
this.configuration = configuration;
this.context = context;
this.boundedness = boundedness;
this.lock = new SpinLock();
// Support allocate splits to reader
this.checkedOffsets = new ConcurrentHashMap<>();
this.reflectedQueueToTaskId = new ConcurrentHashMap<>();
this.pendingSplitAssignmentMap = new ConcurrentHashMap<>();
this.allocatedSet = new ConcurrentHashMap<>();
this.assignedMap = new ConcurrentHashMap<>();
this.allocateStrategy =
AllocateStrategyFactory.getStrategy(
configuration,
context,
new RocketMQSourceEnumState(currentSplitAssignment));
// For rocketmq setting
this.groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
this.startingOffsetsSelector = startingOffsetsSelector;
this.stoppingOffsetsSelector = stoppingOffsetsSelector;
this.partitionDiscoveryIntervalMs =
configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// Initialize the task status
log.info(
"Starting the RocketMQSourceEnumerator with current split assignment: {}",
currentSplitAssignment);
if (!currentSplitAssignment.isEmpty()) {
this.initTask = new boolean[context.currentParallelism()];
}
}