private void createAndStartConsumer()

in rocketmq-logstash-integration/rocketmq-logstash-input/src/main/java/org/apache/rocketmq/logstashplugin/input/RocketMQ.java [132:158]


    private void createAndStartConsumer() throws MQClientException {
        rmqConsumer = new DefaultLitePullConsumer(this.group);
        rmqConsumer.setNamesrvAddr(this.namesrvAddr);
        try {
            final int consumeThreadNum = Math.toIntExact(this.consumerThreads);
            rmqConsumer.setPullThreadNums(consumeThreadNum);
        } catch (Exception e) {
        }
        if (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET.name().equals(this.consumeFromWhere)) {
            rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        } else if (ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.name().equals(this.consumeFromWhere)
                && null != consumeTimestamp && !"".equals(this.consumeTimestamp)) {
            rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            rmqConsumer.setConsumeTimestamp(this.consumeTimestamp);
        } else {
            rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }
        if (MessageModel.BROADCASTING.name().equals(this.messageModel)) {
            rmqConsumer.setMessageModel(MessageModel.BROADCASTING);
        } else {
            rmqConsumer.setMessageModel(MessageModel.CLUSTERING);
        }

        rmqConsumer.subscribe(this.topic, this.subExpression);
        this.rmqConsumer.start();
        logger.warn("LitePullConsumer started: {}", this.rmqConsumer);
    }