in modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloTranslator.java [62:85]
public static void generateMutations(long seq, Map<RowColumn, Bytes> oldData,
Map<RowColumn, Bytes> newData, Consumer<Mutation> consumer) {
Map<Bytes, Mutation> mutationMap = new HashMap<>();
for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
RowColumn rc = entry.getKey();
if (!newData.containsKey(rc)) {
Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(),
seq);
}
}
for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
RowColumn rc = entry.getKey();
Column col = rc.getColumn();
Bytes newVal = entry.getValue();
Bytes oldVal = oldData.get(rc);
if (oldVal == null || !oldVal.equals(newVal)) {
Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
}
}
mutationMap.values().forEach(consumer);
}