in rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java [44:84]
public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
JmsBaseConnection connection) throws JMSException {
synchronized (LOCK_OBJECT) {
checkArgs(destination, commonContext);
if (null == consumerMap.get(context.getConsumerId())) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
if (context.getConsumeThreadNums() > 0) {
consumer.setConsumeThreadMax(context.getConsumeThreadNums());
consumer.setConsumeThreadMin(context.getConsumeThreadNums());
}
if (!Strings.isNullOrEmpty(context.getNameServer())) {
consumer.setNamesrvAddr(context.getNameServer());
}
if (!Strings.isNullOrEmpty(context.getInstanceName())) {
consumer.setInstanceName(context.getInstanceName());
}
consumer.setConsumeMessageBatchMaxSize(1);
//add subscribe?
RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
}
consumerMap.get(context.getConsumerId()).incrementAndGet();
//If the connection has been started, start the consumer right now.
//add start status?
RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
if (connection.isStarted()) {
try {
consumerExt.start();
}
catch (MQClientException mqe) {
JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
jmsException.initCause(mqe);
throw jmsException;
}
}
}
}