protected Event doTake()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [369:435]


    protected Event doTake() throws InterruptedException {
      logger.trace("Starting event take");
      type = TransactionType.TAKE;
      try {
        if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
          logger.info("UUID mismatch, creating new consumer");
          decommissionConsumerAndRecords(consumerAndRecords.get());
          consumerAndRecords.remove();
        }
      } catch (Exception ex) {
        logger.warn("Error while shutting down consumer", ex);
      }
      if (!events.isPresent()) {
        events = Optional.of(new LinkedList<Event>());
      }
      Event e;
      // Give the channel a chance to commit if there has been a rebalance
      if (rebalanceFlag.get()) {
        logger.debug("Returning null event after Consumer rebalance.");
        return null;
      }
      if (!consumerAndRecords.get().failedEvents.isEmpty()) {
        e = consumerAndRecords.get().failedEvents.removeFirst();
      } else {
        if ( logger.isTraceEnabled() ) {
          logger.trace("Assignment during take: {}",
              consumerAndRecords.get().consumer.assignment().toString());
        }
        try {
          long startTime = System.nanoTime();
          if (!consumerAndRecords.get().recordIterator.hasNext()) {
            consumerAndRecords.get().poll();
          }
          if (consumerAndRecords.get().recordIterator.hasNext()) {
            ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
            e = deserializeValue(record.value(), parseAsFlumeEvent);
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
            consumerAndRecords.get().saveOffsets(tp,oam);

            //Add the key to the header
            if (record.key() != null) {
              e.getHeaders().put(KEY_HEADER, record.key());
            }

            long endTime = System.nanoTime();
            counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));

            if (logger.isDebugEnabled()) {
              logger.debug("{} processed output from partition {} offset {}",
                  new Object[] {getName(), record.partition(), record.offset()});
            }
          } else {
            return null;
          }
          counter.incrementEventTakeAttemptCount();
        } catch (Exception ex) {
          logger.warn("Error while getting events from Kafka. This is usually caused by " +
                      "trying to read a non-flume event. Ensure the setting for " +
                      "parseAsFlumeEvent is correct", ex);
          throw new ChannelException("Error while getting events from Kafka", ex);
        }
      }
      eventTaken = true;
      events.get().add(e);
      return e;
    }