in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java [73:102]
public Status doProcess() {
Event event;
String eventBody;
try {
final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
while (eventList.size() < this.getBatchSize() &&
System.currentTimeMillis() < maxBatchEndTime) {
BlockingQueue<Object> blockingQueue = StringSink.getQueue();
while (blockingQueue != null && !blockingQueue.isEmpty()) {
Object message = blockingQueue.take();
eventBody = message.toString();
event = EventBuilder.withBody(eventBody.getBytes());
eventList.add(event);
}
}
if (eventList.size() > 0) {
counter.addToEventReceivedCount(eventList.size());
getChannelProcessor().processEventBatch(eventList);
eventList.clear();
return Status.READY;
}
return Status.BACKOFF;
} catch (Exception e) {
log.error("Flume Source EXCEPTION", e);
counter.incrementEventReadOrChannelFail(e);
return Status.BACKOFF;
}
}