in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java [62:142]
public static DefaultMQProducer initRocketMQProducer(String topic,
RocketMQProducerProperties producerProperties) {
if (!StringUtils.hasLength(producerProperties.getGroup())) {
producerProperties.setGroup(RocketMQConst.DEFAULT_GROUP);
}
Assert.notNull(producerProperties.getNameServer(),
"Property 'nameServer' is required");
RPCHook rpcHook = null;
if (StringUtils.hasLength(producerProperties.getAccessKey())
&& StringUtils.hasLength(producerProperties.getSecretKey())) {
rpcHook = new AclClientRPCHook(
new SessionCredentials(producerProperties.getAccessKey(),
producerProperties.getSecretKey()));
}
DefaultMQProducer producer;
if (RocketMQProducerProperties.ProducerType.Trans
.equalsName(producerProperties.getProducerType())) {
producer = new TransactionMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
if (producerProperties.getEnableMsgTrace()) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, 10,
producerProperties.getCustomizedTraceTopic(), rpcHook);
dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
Field field = DefaultMQProducer.class
.getDeclaredField("traceDispatcher");
field.setAccessible(true);
field.set(producer, dispatcher);
producer.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(dispatcher));
}
catch (Throwable e) {
log.error(
"system mq-trace hook init failed ,maybe can't send msg trace data");
}
}
}
else {
producer = new DefaultMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook,
producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
}
producer.setVipChannelEnabled(
null == rpcHook && producerProperties.getVipChannelEnabled());
producer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));
producer.setNamesrvAddr(producerProperties.getNameServer());
producer.setNamespaceV2(producerProperties.getNamespaceV2());
producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(
producerProperties.getRetryTimesWhenSendAsyncFailed());
producer.setCompressMsgBodyOverHowmuch(
producerProperties.getCompressMsgBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(
producerProperties.getRetryAnotherBroker());
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
producer.setUseTLS(producerProperties.getUseTLS());
producer.setUnitName(producerProperties.getUnitName());
producer.setAccessChannel(AccessChannel.valueOf(producerProperties.getAccessChannel()));
CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(
producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
if (null != checkForbiddenHook) {
producer.getDefaultMQProducerImpl()
.registerCheckForbiddenHook(checkForbiddenHook);
}
SendMessageHook sendMessageHook = RocketMQBeanContainerCache
.getBean(producerProperties.getSendMessageHook(), SendMessageHook.class);
if (null != sendMessageHook) {
producer.getDefaultMQProducerImpl().registerSendMessageHook(sendMessageHook);
}
return producer;
}