in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java [238:314]
public synchronized Status process() throws EventDeliveryException {
if (session.hasPendingOperations()) {
// If for whatever reason we have pending operations, refuse to process
// more and tell the caller to try again a bit later. We don't want to
// pile on the KuduSession.
return Status.BACKOFF;
}
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
txn.begin();
try {
long txnEventCount = 0;
for (; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
List<Operation> operations = operationsProducer.getOperations(event);
for (Operation o : operations) {
session.apply(o);
}
}
logger.debug("Flushing {} events", txnEventCount);
List<OperationResponse> responses = session.flush();
if (responses != null) {
for (OperationResponse response : responses) {
// Throw an EventDeliveryException if at least one of the responses was
// a row error. Row errors can occur for example when an event is inserted
// into Kudu successfully but the Flume transaction is rolled back for some reason,
// and a subsequent replay of the same Flume transaction leads to a
// duplicate key error since the row already exists in Kudu.
// Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
// is enabled in the config.
if (response.hasRowError()) {
throw new EventDeliveryException("Failed to flush one or more changes. " +
"Transaction rolled back: " + response.getRowError().toString());
}
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
txn.commit();
if (txnEventCount == 0) {
return Status.BACKOFF;
}
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
} catch (Throwable e) {
txn.rollback();
String msg = "Failed to commit transaction. Transaction rolled back.";
logger.error(msg, e);
if (e instanceof Error || e instanceof RuntimeException) {
throw new RuntimeException(e);
} else {
logger.error(msg, e);
throw new EventDeliveryException(msg, e);
}
} finally {
txn.close();
}
}