in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java [52:101]
public static DefaultMQPushConsumer initPushConsumer(
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
.getExtension();
Assert.notNull(consumerProperties.getGroup(),
"Property 'group' is required - consumerGroup");
Assert.notNull(consumerProperties.getNameServer(),
"Property 'nameServer' is required");
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
AllocateMessageQueueStrategy.class,
new AllocateMessageQueueAveragely());
RPCHook rpcHook = null;
if (StringUtils.hasLength(consumerProperties.getAccessKey())
&& StringUtils.hasLength(consumerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(consumerProperties.getAccessKey(),
consumerProperties.getSecretKey()));
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
consumerProperties.getGroup(), rpcHook, allocateMessageQueueStrategy,
consumerProperties.getEnableMsgTrace(),
consumerProperties.getCustomizedTraceTopic());
consumer.setVipChannelEnabled(
null == rpcHook && consumerProperties.getVipChannelEnabled());
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
consumer.setNamespace(consumerProperties.getNamespace());
consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setUseTLS(consumerProperties.getUseTLS());
consumer.setPullTimeDelayMillsWhenException(
consumerProperties.getPullTimeDelayMillsWhenException());
consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
consumer.setHeartbeatBrokerInterval(
consumerProperties.getHeartbeatBrokerInterval());
consumer.setPersistConsumerOffsetInterval(
consumerProperties.getPersistConsumerOffsetInterval());
consumer.setPullInterval(consumerProperties.getPush().getPullInterval());
consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
consumer.setUnitName(consumerProperties.getUnitName());
consumer.setMaxReconsumeTimes(
consumerProperties.getPush().getMaxReconsumeTimes());
consumer.setConsumeTimeout(consumerProperties.getPush().getConsumeTimeout());
consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
return consumer;
}