in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java [109:174]
public static DefaultLitePullConsumer initPullConsumer(
String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
.getExtension();
boolean anonymous = !StringUtils.hasLength(consumerProperties.getGroup());
/***
* When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
* According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
*/
if (anonymous && NamespaceUtil.isDLQTopic(topic)) {
throw new RuntimeException(
"group must be configured for DLQ" + topic);
}
if (anonymous) {
consumerProperties.setGroup(RocketMQUtils.anonymousGroup(topic));
}
Assert.notNull(consumerProperties.getNameServer(),
"Property 'nameServer' is required");
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
AllocateMessageQueueStrategy.class);
RPCHook rpcHook = null;
if (StringUtils.hasLength(consumerProperties.getAccessKey())
&& StringUtils.hasLength(consumerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(consumerProperties.getAccessKey(),
consumerProperties.getSecretKey()));
}
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(
consumerProperties.getNamespace(), consumerProperties.getGroup(),
rpcHook);
consumer.setVipChannelEnabled(
null == rpcHook && consumerProperties.getVipChannelEnabled());
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
if (null != allocateMessageQueueStrategy) {
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
}
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
consumer.setUseTLS(consumerProperties.getUseTLS());
consumer.setPullTimeDelayMillsWhenException(
consumerProperties.getPullTimeDelayMillsWhenException());
consumer.setConsumerTimeoutMillisWhenSuspend(
consumerProperties.getPull().getConsumerTimeoutMillisWhenSuspend());
consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
consumer.setHeartbeatBrokerInterval(
consumerProperties.getHeartbeatBrokerInterval());
consumer.setPersistConsumerOffsetInterval(
consumerProperties.getPersistConsumerOffsetInterval());
consumer.setPollTimeoutMillis(
consumerProperties.getPull().getPollTimeoutMillis());
consumer.setPullThreadNums(extendedConsumerProperties.getConcurrency());
// The internal queues are cached by a maximum of 1000
consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
.getPull().getPullThresholdForAll());
consumer.setUnitName(consumerProperties.getUnitName());
consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
return consumer;
}