in modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java [70:90]
public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
Set<Integer> bucketsNotified = new HashSet<>();
while (exports.hasNext()) {
Export<K, V> export = exports.next();
byte[] k = serializer.serialize(export.getKey());
byte[] v = serializer.serialize(export.getValue());
int hash = Hashing.murmur3_32().hashBytes(k).asInt();
int bucketId = Math.abs(hash % numBuckets);
ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
bucket.add(tx.getStartTimestamp(), k, v);
if (!bucketsNotified.contains(bucketId)) {
bucket.notifyExportObserver();
bucketsNotified.add(bucketId);
}
}
}