protected Status doProcess()

in rocketmq-flume/rocketmq-flume-source/src/main/java/org/apache/rocketmq/flume/ng/source/RocketMQSource.java [110:150]


    protected Status doProcess() {

        List<Event> events = new ArrayList<>();
        Map<MessageQueue, Long> offsets = new HashMap<>();
        Event event;
        Map<String, String> headers;

        try {
            List<MessageExt> messageExts = consumer.poll();
            for (MessageExt msg : messageExts) {
                byte[] body = msg.getBody();

                headers = new HashMap<>();
                headers.put(HEADER_TOPIC_NAME, topic);
                headers.put(HEADER_TAG_NAME, tag);

                log.debug("Processing message,body={}", new String(body, "UTF-8"));

                event = EventBuilder.withBody(body, headers);
                events.add(event);
            }

            if (events.size() > 0) {
                sourceCounter.incrementAppendBatchReceivedCount();
                sourceCounter.addToEventReceivedCount(events.size());

                getChannelProcessor().processEventBatch(events);

                sourceCounter.incrementAppendBatchAcceptedCount();
                sourceCounter.addToEventAcceptedCount(events.size());

                events.clear();
            }

        } catch (Exception e) {
            log.error("Failed to consumer message", e);
            return Status.BACKOFF;
        }

        return Status.READY;
    }