public void start()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java [110:143]


	public void start() {
		Instrumentation instrumentation = new Instrumentation(destination.getName(),
				this);
		try {
			defaultMQProducer.start();
			// TransactionMQProducer does not currently support custom
			// MessageQueueSelector.
			if (!isTrans && extendedProducerProperties.isPartitioned()) {
				List<MessageQueue> messageQueues = defaultMQProducer
						.fetchPublishMessageQueues(destination.getName());
				if (extendedProducerProperties.getPartitionCount() != messageQueues
						.size()) {
					log.info(String.format(
							"The partition count of topic '%s' will change from '%s' to '%s'",
							destination.getName(),
							extendedProducerProperties.getPartitionCount(),
							messageQueues.size()));
					extendedProducerProperties.setPartitionCount(messageQueues.size());
					// may be npe!
					partitioningInterceptor.setPartitionCount(
							extendedProducerProperties.getPartitionCount());
				}
			}
			running = true;
			instrumentation.markStartedSuccessfully();
		}
		catch (MQClientException | NullPointerException e) {
			instrumentation.markStartFailed(e);
			log.error("The defaultMQProducer startup failure !!!", e);
		}
		finally {
			InstrumentationManager.addHealthInstrumentation(instrumentation);
		}
	}