public void addAll()

in modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java [102:139]


  public void addAll(TransactionBase tx, Map<K, V> updates) {
    Preconditions.checkState(numBuckets > 0, "Not initialized");

    Set<String> buckets = new HashSet<>();

    BytesBuilder rowBuilder = Bytes.builder();
    rowBuilder.append(updatePrefix);
    int prefixLength = rowBuilder.getLength();

    byte[] startTs = encSeq(tx.getStartTimestamp());

    for (Entry<K, V> entry : updates.entrySet()) {
      byte[] k = serializer.serialize(entry.getKey());
      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
      String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);

      // reset to the common row prefix
      rowBuilder.setLength(prefixLength);

      Bytes row = rowBuilder.append(bucketId).append(':').append(k).append(startTs).toBytes();
      Bytes val = Bytes.of(serializer.serialize(entry.getValue()));

      // TODO set if not exists would be comforting here.... but
      // collisions on bucketId+key+uuid should never occur
      tx.set(row, UPDATE_COL, val);

      buckets.add(bucketId);
    }

    for (String bucketId : buckets) {
      rowBuilder.setLength(prefixLength);
      rowBuilder.append(bucketId).append(':');

      Bytes row = rowBuilder.toBytes();

      tx.setWeakNotification(row, notifyColumn);
    }
  }