public JmsBaseMessageConsumer()

in rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java [44:84]


    public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
        JmsBaseConnection connection) throws JMSException {
        synchronized (LOCK_OBJECT) {
            checkArgs(destination, commonContext);

            if (null == consumerMap.get(context.getConsumerId())) {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
                if (context.getConsumeThreadNums() > 0) {
                    consumer.setConsumeThreadMax(context.getConsumeThreadNums());
                    consumer.setConsumeThreadMin(context.getConsumeThreadNums());
                }
                if (!Strings.isNullOrEmpty(context.getNameServer())) {
                    consumer.setNamesrvAddr(context.getNameServer());
                }
                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
                    consumer.setInstanceName(context.getInstanceName());
                }
                consumer.setConsumeMessageBatchMaxSize(1);
                //add subscribe?
                RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
                consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
            }

            consumerMap.get(context.getConsumerId()).incrementAndGet();

            //If the connection has been started, start the consumer right now.
            //add start status?
            RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
            if (connection.isStarted()) {
                try {
                    consumerExt.start();
                }
                catch (MQClientException mqe) {
                    JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
                    jmsException.initCause(mqe);
                    throw jmsException;
                }
            }
        }

    }