in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [389:437]
private void putEventsAndCommit(final List<Row> actions,
final List<Increment> incs, Transaction txn) throws Exception {
privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {
final List<Mutation> mutations = new ArrayList<>(actions.size());
for (Row r : actions) {
if (r instanceof Put) {
((Put) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
// Newer versions of HBase - Increment implements Row.
if (r instanceof Increment) {
((Increment) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
if (r instanceof Mutation) {
mutations.add((Mutation)r);
} else {
logger.warn("dropping row " + r + " since it is not an Increment or Put");
}
}
table.mutate(mutations);
table.flush();
return null;
});
privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {
List<Increment> processedIncrements;
if (batchIncrements) {
processedIncrements = coalesceIncrements(incs);
} else {
processedIncrements = incs;
}
// Only used for unit testing.
if (debugIncrCallback != null) {
debugIncrCallback.onAfterCoalesce(processedIncrements);
}
for (final Increment i : processedIncrements) {
i.setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
table.mutate(i);
}
table.flush();
return null;
});
txn.commit();
sinkCounter.addToEventDrainSuccessCount(actions.size());
}