public Status doProcess()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java [73:102]


    public Status doProcess() {
        Event event;
        String eventBody;
        try {
            final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;

            while (eventList.size() < this.getBatchSize() &&
                    System.currentTimeMillis() < maxBatchEndTime) {
                BlockingQueue<Object> blockingQueue = StringSink.getQueue();
                while (blockingQueue != null && !blockingQueue.isEmpty()) {
                    Object message = blockingQueue.take();
                    eventBody = message.toString();
                    event = EventBuilder.withBody(eventBody.getBytes());
                    eventList.add(event);
                }
            }
            if (eventList.size() > 0) {
                counter.addToEventReceivedCount(eventList.size());
                getChannelProcessor().processEventBatch(eventList);
                eventList.clear();
                return Status.READY;
            }
            return Status.BACKOFF;

        } catch (Exception e) {
            log.error("Flume Source EXCEPTION", e);
            counter.incrementEventReadOrChannelFail(e);
            return Status.BACKOFF;
        }
    }