in rocketmq-logstash-integration/rocketmq-logstash-input/src/main/java/org/apache/rocketmq/logstashplugin/input/RocketMQ.java [132:158]
private void createAndStartConsumer() throws MQClientException {
rmqConsumer = new DefaultLitePullConsumer(this.group);
rmqConsumer.setNamesrvAddr(this.namesrvAddr);
try {
final int consumeThreadNum = Math.toIntExact(this.consumerThreads);
rmqConsumer.setPullThreadNums(consumeThreadNum);
} catch (Exception e) {
}
if (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET.name().equals(this.consumeFromWhere)) {
rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
} else if (ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.name().equals(this.consumeFromWhere)
&& null != consumeTimestamp && !"".equals(this.consumeTimestamp)) {
rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
rmqConsumer.setConsumeTimestamp(this.consumeTimestamp);
} else {
rmqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}
if (MessageModel.BROADCASTING.name().equals(this.messageModel)) {
rmqConsumer.setMessageModel(MessageModel.BROADCASTING);
} else {
rmqConsumer.setMessageModel(MessageModel.CLUSTERING);
}
rmqConsumer.subscribe(this.topic, this.subExpression);
this.rmqConsumer.start();
logger.warn("LitePullConsumer started: {}", this.rmqConsumer);
}