in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [438:512]
protected void doCommit() throws InterruptedException {
logger.trace("Starting commit");
if (type.equals(TransactionType.NONE)) {
return;
}
if (type.equals(TransactionType.PUT)) {
if (!kafkaFutures.isPresent()) {
kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
}
try {
if (useKafkaTransactions) {
kafkaTxLock.lock();
logger.debug("Beginning Kafka Transaction");
producer.beginTransaction();
}
long batchSize = producerRecords.get().size();
long startTime = System.nanoTime();
int index = 0;
for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
index++;
kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
}
if (useKafkaTransactions) {
logger.debug("Committing Kafka Transaction");
producer.commitTransaction();
kafkaTxLock.unlock();
} else {
// Ensure that the records are actually flushed by the producer, regardless of linger.ms.
// Per the Kafka docs we do not need to linger or wait for the callback if we're using transactions
producer.flush();
for (Future<RecordMetadata> future : kafkaFutures.get()) {
future.get();
}
}
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
counter.addToEventPutSuccessCount(batchSize);
producerRecords.get().clear();
kafkaFutures.get().clear();
} catch (Exception ex) {
if (useKafkaTransactions) {
logger.debug("Aborting transaction");
try {
producer.abortTransaction();
} finally {
kafkaTxLock.unlock();
}
}
logger.warn("Sending events to Kafka failed", ex);
throw new ChannelException("Commit failed as send to Kafka failed",
ex);
}
} else {
// event taken ensures that we have collected events in this transaction
// before committing
if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
logger.trace("About to commit batch");
long startTime = System.nanoTime();
consumerAndRecords.get().commitOffsets();
long endTime = System.nanoTime();
counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
if (logger.isDebugEnabled()) {
logger.debug(consumerAndRecords.get().getCommittedOffsetsString());
}
}
int takes = events.get().size();
if (takes > 0) {
counter.addToEventTakeSuccessCount(takes);
events.get().clear();
}
}
}