public static DefaultMQProducer initRocketMQProducer()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java [62:142]


	public static DefaultMQProducer initRocketMQProducer(String topic,
			RocketMQProducerProperties producerProperties) {
		if (!StringUtils.hasLength(producerProperties.getGroup())) {
			producerProperties.setGroup(RocketMQConst.DEFAULT_GROUP);
		}

		Assert.notNull(producerProperties.getNameServer(),
				"Property 'nameServer' is required");

		RPCHook rpcHook = null;
		if (StringUtils.hasLength(producerProperties.getAccessKey())
				&& StringUtils.hasLength(producerProperties.getSecretKey())) {
			rpcHook = new AclClientRPCHook(
					new SessionCredentials(producerProperties.getAccessKey(),
							producerProperties.getSecretKey()));
		}
		DefaultMQProducer producer;
		if (RocketMQProducerProperties.ProducerType.Trans
				.equalsName(producerProperties.getProducerType())) {
			producer = new TransactionMQProducer(producerProperties.getNamespace(),
					producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(),
					producerProperties.getCustomizedTraceTopic());
			if (producerProperties.getEnableMsgTrace()) {
				try {
					AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
							producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, 10,
							producerProperties.getCustomizedTraceTopic(), rpcHook);
					dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
					Field field = DefaultMQProducer.class
							.getDeclaredField("traceDispatcher");
					field.setAccessible(true);
					field.set(producer, dispatcher);
					producer.getDefaultMQProducerImpl().registerSendMessageHook(
							new SendMessageTraceHookImpl(dispatcher));
				}
				catch (Throwable e) {
					log.error(
							"system mq-trace hook init failed ,maybe can't send msg trace data");
				}
			}
		}
		else {
			producer = new DefaultMQProducer(producerProperties.getNamespace(),
					producerProperties.getGroup(), rpcHook,
					producerProperties.getEnableMsgTrace(),
					producerProperties.getCustomizedTraceTopic());
		}

		producer.setVipChannelEnabled(
				null == rpcHook && producerProperties.getVipChannelEnabled());
		producer.setInstanceName(
				RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));
		producer.setNamesrvAddr(producerProperties.getNameServer());
		producer.setNamespaceV2(producerProperties.getNamespaceV2());
		producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
		producer.setRetryTimesWhenSendFailed(
				producerProperties.getRetryTimesWhenSendFailed());
		producer.setRetryTimesWhenSendAsyncFailed(
				producerProperties.getRetryTimesWhenSendAsyncFailed());
		producer.setCompressMsgBodyOverHowmuch(
				producerProperties.getCompressMsgBodyThreshold());
		producer.setRetryAnotherBrokerWhenNotStoreOK(
				producerProperties.getRetryAnotherBroker());
		producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
		producer.setUseTLS(producerProperties.getUseTLS());
		producer.setUnitName(producerProperties.getUnitName());
		producer.setAccessChannel(AccessChannel.valueOf(producerProperties.getAccessChannel()));
		CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(
				producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
		if (null != checkForbiddenHook) {
			producer.getDefaultMQProducerImpl()
					.registerCheckForbiddenHook(checkForbiddenHook);
		}
		SendMessageHook sendMessageHook = RocketMQBeanContainerCache
				.getBean(producerProperties.getSendMessageHook(), SendMessageHook.class);
		if (null != sendMessageHook) {
			producer.getDefaultMQProducerImpl().registerSendMessageHook(sendMessageHook);
		}

		return producer;
	}