in modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java [78:104]
public static void generateMutations(long seq, TxLog txLog, Consumer<Mutation> consumer) {
Map<Bytes, Mutation> mutationMap = new HashMap<>();
for (LogEntry le : txLog.getLogEntries()) {
LogEntry.Operation op = le.getOp();
Column col = le.getColumn();
byte[] cf = col.getFamily().toArray();
byte[] cq = col.getQualifier().toArray();
byte[] cv = col.getVisibility().toArray();
if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray()));
if (op.equals(LogEntry.Operation.DELETE)) {
if (col.isVisibilitySet()) {
m.putDelete(cf, cq, new ColumnVisibility(cv), seq);
} else {
m.putDelete(cf, cq, seq);
}
} else {
if (col.isVisibilitySet()) {
m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray());
} else {
m.put(cf, cq, seq, le.getValue().toArray());
}
}
}
}
mutationMap.values().forEach(consumer);
}