public static DefaultLitePullConsumer initPullConsumer()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java [109:174]


	public static DefaultLitePullConsumer initPullConsumer(
			String topic,
			ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
		RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
				.getExtension();
		boolean anonymous = !StringUtils.hasLength(consumerProperties.getGroup());
		/***
		 * 	When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
		 *  According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
		 */
		if (anonymous && NamespaceUtil.isDLQTopic(topic)) {
			throw new RuntimeException(
					"group must be configured for DLQ" + topic);
		}
		if (anonymous) {
			consumerProperties.setGroup(RocketMQUtils.anonymousGroup(topic));
		}

		Assert.notNull(consumerProperties.getNameServer(),
				"Property 'nameServer' is required");
		AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
				.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
						AllocateMessageQueueStrategy.class);

		RPCHook rpcHook = null;
		if (StringUtils.hasLength(consumerProperties.getAccessKey())
				&& StringUtils.hasLength(consumerProperties.getSecretKey())) {
			rpcHook = new AclClientRPCHook(
					new SessionCredentials(consumerProperties.getAccessKey(),
							consumerProperties.getSecretKey()));
		}

		DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(
				consumerProperties.getNamespace(), consumerProperties.getGroup(),
				rpcHook);
		consumer.setVipChannelEnabled(
				null == rpcHook && consumerProperties.getVipChannelEnabled());
		consumer.setInstanceName(
				RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
		if (null != allocateMessageQueueStrategy) {
			consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
		}
		consumer.setNamesrvAddr(consumerProperties.getNameServer());
		consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
		consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
		consumer.setUseTLS(consumerProperties.getUseTLS());
		consumer.setPullTimeDelayMillsWhenException(
				consumerProperties.getPullTimeDelayMillsWhenException());
		consumer.setConsumerTimeoutMillisWhenSuspend(
				consumerProperties.getPull().getConsumerTimeoutMillisWhenSuspend());
		consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
		consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
		consumer.setHeartbeatBrokerInterval(
				consumerProperties.getHeartbeatBrokerInterval());
		consumer.setPersistConsumerOffsetInterval(
				consumerProperties.getPersistConsumerOffsetInterval());
		consumer.setPollTimeoutMillis(
				consumerProperties.getPull().getPollTimeoutMillis());
		consumer.setPullThreadNums(extendedConsumerProperties.getConcurrency());
		// The internal queues are cached by a maximum of 1000
		consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
				.getPull().getPullThresholdForAll());
		consumer.setUnitName(consumerProperties.getUnitName());
		consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
		return consumer;
	}