in rocketmq-logstash-integration/rocketmq-logstash-input/src/main/java/org/apache/rocketmq/logstashplugin/input/RocketMQ.java [103:130]
public void start(Consumer<Map<String, Object>> consumer) {
if (this.stopped) {
return;
}
logger.info("Start processing");
try {
createAndStartConsumer();
while (!stopped) {
final List<MessageExt> messageExtList = this.rmqConsumer.poll();
if (null == messageExtList || messageExtList.isEmpty()) {
continue;
}
for (MessageExt messageExt : messageExtList) {
this.codec.decode(ByteBuffer.wrap(messageExt.getBody()), m -> {
consumer.accept(m);
});
}
}
} catch (MQClientException e) {
logger.error("consumer starts failed", e);
} finally {
stopped = true;
done.countDown();
}
}