in rocketmq-flume/rocketmq-flume-source/src/main/java/org/apache/rocketmq/flume/ng/source/RocketMQSource.java [110:150]
protected Status doProcess() {
List<Event> events = new ArrayList<>();
Map<MessageQueue, Long> offsets = new HashMap<>();
Event event;
Map<String, String> headers;
try {
List<MessageExt> messageExts = consumer.poll();
for (MessageExt msg : messageExts) {
byte[] body = msg.getBody();
headers = new HashMap<>();
headers.put(HEADER_TOPIC_NAME, topic);
headers.put(HEADER_TAG_NAME, tag);
log.debug("Processing message,body={}", new String(body, "UTF-8"));
event = EventBuilder.withBody(body, headers);
events.add(event);
}
if (events.size() > 0) {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
getChannelProcessor().processEventBatch(events);
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
events.clear();
}
} catch (Exception e) {
log.error("Failed to consumer message", e);
return Status.BACKOFF;
}
return Status.READY;
}