protected void doCommit()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [438:512]


    protected void doCommit() throws InterruptedException {
      logger.trace("Starting commit");
      if (type.equals(TransactionType.NONE)) {
        return;
      }
      if (type.equals(TransactionType.PUT)) {
        if (!kafkaFutures.isPresent()) {
          kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
        }
        try {
          if (useKafkaTransactions) {
            kafkaTxLock.lock();
            logger.debug("Beginning Kafka Transaction");
            producer.beginTransaction();
          }
          long batchSize = producerRecords.get().size();
          long startTime = System.nanoTime();
          int index = 0;
          for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
            index++;
            kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
          }

          if (useKafkaTransactions) {
            logger.debug("Committing Kafka Transaction");
            producer.commitTransaction();
            kafkaTxLock.unlock();
          } else {
            // Ensure that the records are actually flushed by the producer, regardless of linger.ms.
            // Per the Kafka docs we do not need to linger or wait for the callback if we're using transactions
            producer.flush();

            for (Future<RecordMetadata> future : kafkaFutures.get()) {
              future.get();
            }
          }
          long endTime = System.nanoTime();
          counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
          counter.addToEventPutSuccessCount(batchSize);
          producerRecords.get().clear();
          kafkaFutures.get().clear();
        } catch (Exception ex) {
          if (useKafkaTransactions) {
            logger.debug("Aborting transaction");
            try {
              producer.abortTransaction();
            } finally {
              kafkaTxLock.unlock();
            }
          }
          logger.warn("Sending events to Kafka failed", ex);
          throw new ChannelException("Commit failed as send to Kafka failed",
                  ex);
        }
      } else {
        // event taken ensures that we have collected events in this transaction
        // before committing
        if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
          logger.trace("About to commit batch");
          long startTime = System.nanoTime();
          consumerAndRecords.get().commitOffsets();
          long endTime = System.nanoTime();
          counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
          if (logger.isDebugEnabled()) {
            logger.debug(consumerAndRecords.get().getCommittedOffsetsString());
          }
        }

        int takes = events.get().size();
        if (takes > 0) {
          counter.addToEventTakeSuccessCount(takes);
          events.get().clear();
        }
      }
    }