in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [326:387]
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<>();
List<Increment> incs = new LinkedList<>();
try {
txn.begin();
if (serializer instanceof BatchAware) {
((BatchAware) serializer).onBatchStart();
}
long i = 0;
for (; i < batchSize; i++) {
Event event = channel.take();
if (event == null) {
if (i == 0) {
status = Status.BACKOFF;
sinkCounter.incrementBatchEmptyCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
break;
} else {
serializer.initialize(event, columnFamily);
actions.addAll(serializer.getActions());
incs.addAll(serializer.getIncrements());
}
}
if (i == batchSize) {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(i);
putEventsAndCommit(actions, incs, txn);
} catch (Throwable e) {
try {
txn.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been " +
"successful.", e2);
}
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
sinkCounter.incrementEventWriteOrChannelFail(e);
if (e instanceof Error || e instanceof RuntimeException) {
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} else {
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
throw new EventDeliveryException("Failed to commit transaction." +
"Transaction rolled back.", e);
}
} finally {
txn.close();
}
return status;
}