in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [369:435]
protected Event doTake() throws InterruptedException {
logger.trace("Starting event take");
type = TransactionType.TAKE;
try {
if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
logger.info("UUID mismatch, creating new consumer");
decommissionConsumerAndRecords(consumerAndRecords.get());
consumerAndRecords.remove();
}
} catch (Exception ex) {
logger.warn("Error while shutting down consumer", ex);
}
if (!events.isPresent()) {
events = Optional.of(new LinkedList<Event>());
}
Event e;
// Give the channel a chance to commit if there has been a rebalance
if (rebalanceFlag.get()) {
logger.debug("Returning null event after Consumer rebalance.");
return null;
}
if (!consumerAndRecords.get().failedEvents.isEmpty()) {
e = consumerAndRecords.get().failedEvents.removeFirst();
} else {
if ( logger.isTraceEnabled() ) {
logger.trace("Assignment during take: {}",
consumerAndRecords.get().consumer.assignment().toString());
}
try {
long startTime = System.nanoTime();
if (!consumerAndRecords.get().recordIterator.hasNext()) {
consumerAndRecords.get().poll();
}
if (consumerAndRecords.get().recordIterator.hasNext()) {
ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
e = deserializeValue(record.value(), parseAsFlumeEvent);
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
consumerAndRecords.get().saveOffsets(tp,oam);
//Add the key to the header
if (record.key() != null) {
e.getHeaders().put(KEY_HEADER, record.key());
}
long endTime = System.nanoTime();
counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));
if (logger.isDebugEnabled()) {
logger.debug("{} processed output from partition {} offset {}",
new Object[] {getName(), record.partition(), record.offset()});
}
} else {
return null;
}
counter.incrementEventTakeAttemptCount();
} catch (Exception ex) {
logger.warn("Error while getting events from Kafka. This is usually caused by " +
"trying to read a non-flume event. Ensure the setting for " +
"parseAsFlumeEvent is correct", ex);
throw new ChannelException("Error while getting events from Kafka", ex);
}
}
eventTaken = true;
events.get().add(e);
return e;
}