in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java [110:143]
public void start() {
Instrumentation instrumentation = new Instrumentation(destination.getName(),
this);
try {
defaultMQProducer.start();
// TransactionMQProducer does not currently support custom
// MessageQueueSelector.
if (!isTrans && extendedProducerProperties.isPartitioned()) {
List<MessageQueue> messageQueues = defaultMQProducer
.fetchPublishMessageQueues(destination.getName());
if (extendedProducerProperties.getPartitionCount() != messageQueues
.size()) {
log.info(String.format(
"The partition count of topic '%s' will change from '%s' to '%s'",
destination.getName(),
extendedProducerProperties.getPartitionCount(),
messageQueues.size()));
extendedProducerProperties.setPartitionCount(messageQueues.size());
// may be npe!
partitioningInterceptor.setPartitionCount(
extendedProducerProperties.getPartitionCount());
}
}
running = true;
instrumentation.markStartedSuccessfully();
}
catch (MQClientException | NullPointerException e) {
instrumentation.markStartFailed(e);
log.error("The defaultMQProducer startup failure !!!", e);
}
finally {
InstrumentationManager.addHealthInstrumentation(instrumentation);
}
}