in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java [36:56]
public static AllocateStrategy getStrategy(
Configuration rocketmqSourceOptions,
SplitEnumeratorContext<RocketMQSourceSplit> context,
RocketMQSourceEnumState enumState) {
String allocateStrategyName =
rocketmqSourceOptions.getString(
RocketMQSourceOptions.ALLOCATE_MESSAGE_QUEUE_STRATEGY);
switch (allocateStrategyName) {
case STRATEGY_NAME_CONSISTENT_HASH:
return new ConsistentHashAllocateStrategy();
case STRATEGY_NAME_BROADCAST:
return new BroadcastAllocateStrategy();
case STRATEGY_NAME_AVERAGE:
return new AverageAllocateStrategy();
default:
throw new IllegalArgumentException(
"We don't support this allocate strategy: " + allocateStrategyName);
}
}