public static DefaultMQPushConsumer initPushConsumer()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java [52:101]


	public static DefaultMQPushConsumer initPushConsumer(
			ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
		RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
				.getExtension();
		Assert.notNull(consumerProperties.getGroup(),
				"Property 'group' is required - consumerGroup");
		Assert.notNull(consumerProperties.getNameServer(),
				"Property 'nameServer' is required");
		AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
				.getBean(consumerProperties.getAllocateMessageQueueStrategy(),
						AllocateMessageQueueStrategy.class,
						new AllocateMessageQueueAveragely());
		RPCHook rpcHook = null;
		if (StringUtils.hasLength(consumerProperties.getAccessKey())
				&& StringUtils.hasLength(consumerProperties.getSecretKey())) {
			rpcHook = new AclClientRPCHook(
					new SessionCredentials(consumerProperties.getAccessKey(),
							consumerProperties.getSecretKey()));
		}
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
				consumerProperties.getGroup(), rpcHook, allocateMessageQueueStrategy,
				consumerProperties.getEnableMsgTrace(),
				consumerProperties.getCustomizedTraceTopic());
		consumer.setVipChannelEnabled(
				null == rpcHook && consumerProperties.getVipChannelEnabled());
		consumer.setInstanceName(
				RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
		consumer.setNamespace(consumerProperties.getNamespace());
		consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
		consumer.setNamesrvAddr(consumerProperties.getNameServer());
		consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
		consumer.setUseTLS(consumerProperties.getUseTLS());
		consumer.setPullTimeDelayMillsWhenException(
				consumerProperties.getPullTimeDelayMillsWhenException());
		consumer.setPullBatchSize(consumerProperties.getPullBatchSize());
		consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere());
		consumer.setHeartbeatBrokerInterval(
				consumerProperties.getHeartbeatBrokerInterval());
		consumer.setPersistConsumerOffsetInterval(
				consumerProperties.getPersistConsumerOffsetInterval());
		consumer.setPullInterval(consumerProperties.getPush().getPullInterval());
		consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
		consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
		consumer.setUnitName(consumerProperties.getUnitName());
		consumer.setMaxReconsumeTimes(
				consumerProperties.getPush().getMaxReconsumeTimes());
		consumer.setConsumeTimeout(consumerProperties.getPush().getConsumeTimeout());
		consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
		return consumer;
	}