void process()

in modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java [168:310]


  void process(TransactionBase tx, Bytes ntfyRow, Column col, Combiner<K, V> combiner,
      ChangeObserver<K, V> changeObserver) throws Exception {

    Preconditions.checkState(ntfyRow.startsWith(updatePrefix));

    Bytes nextKey = tx.get(ntfyRow, NEXT_COL);

    Span span;

    if (nextKey != null) {
      Bytes startRow = Bytes.builder(ntfyRow.length() + nextKey.length()).append(ntfyRow)
          .append(nextKey).toBytes();
      Span tmpSpan = Span.prefix(ntfyRow);
      Span nextSpan = new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
          tmpSpan.isEndInclusive());
      span = nextSpan;
    } else {
      span = Span.prefix(ntfyRow);
    }

    Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();

    Map<Bytes, List<Bytes>> updates = new HashMap<>();

    long approxMemUsed = 0;

    Bytes partiallyReadKey = null;
    boolean setNextKey = false;

    if (iter.hasNext()) {
      Bytes lastKey = null;
      while (iter.hasNext() && approxMemUsed < bufferSize) {
        RowColumnValue rcv = iter.next();
        Bytes curRow = rcv.getRow();

        tx.delete(curRow, UPDATE_COL);

        Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
        lastKey = serializedKey;

        List<Bytes> updateList = updates.get(serializedKey);
        if (updateList == null) {
          updateList = new ArrayList<>();
          updates.put(serializedKey, updateList);
        }

        Bytes val = rcv.getValue();
        updateList.add(val);

        approxMemUsed += curRow.length();
        approxMemUsed += val.length();
      }

      if (iter.hasNext()) {
        RowColumnValue rcv = iter.next();
        Bytes curRow = rcv.getRow();

        // check if more updates for last key
        if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
          // there are still more updates for this key
          partiallyReadKey = lastKey;

          // start next time at the current key
          tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
        } else {
          // start next time at the next possible key
          Bytes nextPossible =
              Bytes.builder(lastKey.length() + 1).append(lastKey).append(0).toBytes();
          tx.set(ntfyRow, NEXT_COL, nextPossible);
        }

        setNextKey = true;
      } else if (nextKey != null) {
        // clear nextKey
        tx.delete(ntfyRow, NEXT_COL);
      }
    } else if (nextKey != null) {
      tx.delete(ntfyRow, NEXT_COL);
    }

    if (nextKey != null || setNextKey) {
      // If not all data was read need to run again in the future. If scanning was started in the
      // middle of the bucket, its possible there is new data before nextKey that still needs to be
      // processed. If scanning stopped before reading the entire bucket there may be data after the
      // stop point.
      tx.setWeakNotification(ntfyRow, col);
    }



    BytesBuilder rowBuilder = Bytes.builder();
    rowBuilder.append(dataPrefix);
    rowBuilder.append(ntfyRow.subSequence(updatePrefix.length(), ntfyRow.length()));
    int rowPrefixLen = rowBuilder.getLength();

    Set<Bytes> keysToFetch = updates.keySet();
    if (partiallyReadKey != null) {
      final Bytes prk = partiallyReadKey;
      keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
    }
    Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);

    ArrayList<Change<K, V>> updatesToReport = new ArrayList<>(updates.size());

    for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
      rowBuilder.setLength(rowPrefixLen);
      Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
      Bytes currVal =
          currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);

      K kd = serializer.deserialize(entry.getKey().toArray(), keyType);

      if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
        // not all updates were read for this key, so requeue the combined updates as an update
        Optional<V> nv = combiner.combine(new InputImpl<>(kd, this::deserVal, entry.getValue()));
        if (nv.isPresent()) {
          addAll(tx, Collections.singletonMap(kd, nv.get()));
        }
      } else {
        Optional<V> nv =
            combiner.combine(new InputImpl<>(kd, this::deserVal, currVal, entry.getValue()));
        Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
        if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
          if (newVal == null) {
            tx.delete(currentValueRow, DATA_COLUMN);
          } else {
            tx.set(currentValueRow, DATA_COLUMN, newVal);
          }

          Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
          updatesToReport.add(new ChangeImpl<>(kd, cvd, nv));
        }
      }
    }

    // TODO could clear these as converted to objects to avoid double memory usage
    updates.clear();
    currentVals.clear();

    if (updatesToReport.size() > 0) {
      changeObserver.process(tx, updatesToReport);
    }
  }