public Status process()

in rocketmq-flume/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java [114:196]


    public Status process() throws EventDeliveryException {

        Channel channel = getChannel();
        Transaction transaction = null;

        try {
            transaction = channel.getTransaction();
            transaction.begin();

            /*
            batch take
             */
            List<Event> events = new ArrayList<>();
            long beginTime = System.currentTimeMillis();
            while (true) {
                Event event = channel.take();
                if (event != null) {
                    events.add(event);
                }

                if (events.size() == batchSize
                    || System.currentTimeMillis() - beginTime > maxProcessTime) {
                    break;
                }
            }

            if (events.size() == 0) {
                sinkCounter.incrementBatchEmptyCount();

                transaction.rollback();
                return Status.BACKOFF;
            }
            /*
            async send
             */
            CountDownLatch latch = new CountDownLatch(events.size());
            AtomicInteger errorNum = new AtomicInteger();

            for (Event event : events) {
                byte[] body = event.getBody();
                Message message = new Message(topic, tag, body);

                if (log.isDebugEnabled()) {
                    log.debug("Processing event,body={}", new String(body, "UTF-8"));
                }
                producer.send(message, new SendCallBackHandler(message, latch, errorNum));
            }
            latch.await();

            sinkCounter.addToEventDrainAttemptCount(events.size());

            if (errorNum.get() > 0) {
                log.error("errorNum=" + errorNum + ",transaction will rollback");
                transaction.rollback();
                return Status.BACKOFF;
            } else {
                transaction.commit();

                sinkCounter.addToEventDrainSuccessCount(events.size());

                return Status.READY;
            }

        } catch (Throwable e) {
            log.error("Failed to processing event", e);

            if (transaction != null) {
                try {
                    transaction.rollback();
                } catch (Throwable ex) {
                    log.error("Failed to rollback transaction", ex);
                    throw new EventDeliveryException("Failed to rollback transaction", ex);
                }
            }

            return Status.BACKOFF;

        } finally {
            if (transaction != null) {
                transaction.close();
            }
        }
    }