in modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java [102:139]
public void addAll(TransactionBase tx, Map<K, V> updates) {
Preconditions.checkState(numBuckets > 0, "Not initialized");
Set<String> buckets = new HashSet<>();
BytesBuilder rowBuilder = Bytes.builder();
rowBuilder.append(updatePrefix);
int prefixLength = rowBuilder.getLength();
byte[] startTs = encSeq(tx.getStartTimestamp());
for (Entry<K, V> entry : updates.entrySet()) {
byte[] k = serializer.serialize(entry.getKey());
int hash = Hashing.murmur3_32().hashBytes(k).asInt();
String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);
// reset to the common row prefix
rowBuilder.setLength(prefixLength);
Bytes row = rowBuilder.append(bucketId).append(':').append(k).append(startTs).toBytes();
Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
// TODO set if not exists would be comforting here.... but
// collisions on bucketId+key+uuid should never occur
tx.set(row, UPDATE_COL, val);
buckets.add(bucketId);
}
for (String bucketId : buckets) {
rowBuilder.setLength(prefixLength);
rowBuilder.append(bucketId).append(':');
Bytes row = rowBuilder.toBytes();
tx.setWeakNotification(row, notifyColumn);
}
}