public void start()

in rocketmq-logstash-integration/rocketmq-logstash-input/src/main/java/org/apache/rocketmq/logstashplugin/input/RocketMQ.java [103:130]


    public void start(Consumer<Map<String, Object>> consumer) {
        if (this.stopped) {
            return;
        }
        logger.info("Start processing");

        try {
            createAndStartConsumer();

            while (!stopped) {
                final List<MessageExt> messageExtList = this.rmqConsumer.poll();
                if (null == messageExtList || messageExtList.isEmpty()) {
                    continue;
                }
                for (MessageExt messageExt : messageExtList) {
                    this.codec.decode(ByteBuffer.wrap(messageExt.getBody()), m -> {
                        consumer.accept(m);
                    });
                }

            }
        } catch (MQClientException e) {
            logger.error("consumer starts failed", e);
        } finally {
            stopped = true;
            done.countDown();
        }
    }