in modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java [168:310]
void process(TransactionBase tx, Bytes ntfyRow, Column col, Combiner<K, V> combiner,
ChangeObserver<K, V> changeObserver) throws Exception {
Preconditions.checkState(ntfyRow.startsWith(updatePrefix));
Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
Span span;
if (nextKey != null) {
Bytes startRow = Bytes.builder(ntfyRow.length() + nextKey.length()).append(ntfyRow)
.append(nextKey).toBytes();
Span tmpSpan = Span.prefix(ntfyRow);
Span nextSpan = new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
tmpSpan.isEndInclusive());
span = nextSpan;
} else {
span = Span.prefix(ntfyRow);
}
Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();
Map<Bytes, List<Bytes>> updates = new HashMap<>();
long approxMemUsed = 0;
Bytes partiallyReadKey = null;
boolean setNextKey = false;
if (iter.hasNext()) {
Bytes lastKey = null;
while (iter.hasNext() && approxMemUsed < bufferSize) {
RowColumnValue rcv = iter.next();
Bytes curRow = rcv.getRow();
tx.delete(curRow, UPDATE_COL);
Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
lastKey = serializedKey;
List<Bytes> updateList = updates.get(serializedKey);
if (updateList == null) {
updateList = new ArrayList<>();
updates.put(serializedKey, updateList);
}
Bytes val = rcv.getValue();
updateList.add(val);
approxMemUsed += curRow.length();
approxMemUsed += val.length();
}
if (iter.hasNext()) {
RowColumnValue rcv = iter.next();
Bytes curRow = rcv.getRow();
// check if more updates for last key
if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
// there are still more updates for this key
partiallyReadKey = lastKey;
// start next time at the current key
tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
} else {
// start next time at the next possible key
Bytes nextPossible =
Bytes.builder(lastKey.length() + 1).append(lastKey).append(0).toBytes();
tx.set(ntfyRow, NEXT_COL, nextPossible);
}
setNextKey = true;
} else if (nextKey != null) {
// clear nextKey
tx.delete(ntfyRow, NEXT_COL);
}
} else if (nextKey != null) {
tx.delete(ntfyRow, NEXT_COL);
}
if (nextKey != null || setNextKey) {
// If not all data was read need to run again in the future. If scanning was started in the
// middle of the bucket, its possible there is new data before nextKey that still needs to be
// processed. If scanning stopped before reading the entire bucket there may be data after the
// stop point.
tx.setWeakNotification(ntfyRow, col);
}
BytesBuilder rowBuilder = Bytes.builder();
rowBuilder.append(dataPrefix);
rowBuilder.append(ntfyRow.subSequence(updatePrefix.length(), ntfyRow.length()));
int rowPrefixLen = rowBuilder.getLength();
Set<Bytes> keysToFetch = updates.keySet();
if (partiallyReadKey != null) {
final Bytes prk = partiallyReadKey;
keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
}
Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
ArrayList<Change<K, V>> updatesToReport = new ArrayList<>(updates.size());
for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
rowBuilder.setLength(rowPrefixLen);
Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
Bytes currVal =
currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
// not all updates were read for this key, so requeue the combined updates as an update
Optional<V> nv = combiner.combine(new InputImpl<>(kd, this::deserVal, entry.getValue()));
if (nv.isPresent()) {
addAll(tx, Collections.singletonMap(kd, nv.get()));
}
} else {
Optional<V> nv =
combiner.combine(new InputImpl<>(kd, this::deserVal, currVal, entry.getValue()));
Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
if (newVal == null) {
tx.delete(currentValueRow, DATA_COLUMN);
} else {
tx.set(currentValueRow, DATA_COLUMN, newVal);
}
Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
updatesToReport.add(new ChangeImpl<>(kd, cvd, nv));
}
}
}
// TODO could clear these as converted to objects to avoid double memory usage
updates.clear();
currentVals.clear();
if (updatesToReport.size() > 0) {
changeObserver.process(tx, updatesToReport);
}
}