in flume-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java [453:493]
private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
Preconditions.checkNotNull(incs, "List of Increments must not be null");
// Aggregate all of the increment row/family/column counts.
// The nested map is keyed like this: {row, family, qualifier} => count.
Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters =
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Increment inc : incs) {
byte[] row = inc.getRow();
Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
byte[] family = familyEntry.getKey();
NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
byte[] qualifier = qualifierEntry.getKey();
Long count = qualifierEntry.getValue();
incrementCounter(counters, row, family, qualifier, count);
}
}
}
// Reconstruct list of Increments per unique row/family/qualifier.
List<Increment> coalesced = Lists.newLinkedList();
for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> rowEntry :
counters.entrySet()) {
byte[] row = rowEntry.getKey();
Map<byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
Increment inc = new Increment(row);
for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
byte[] family = familyEntry.getKey();
NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
byte[] qualifier = qualifierEntry.getKey();
long count = qualifierEntry.getValue();
inc.addColumn(family, qualifier, count);
}
}
coalesced.add(inc);
}
return coalesced;
}