in modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java [53:86]
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
ExportBucket bucket = new ExportBucket(tx, row);
Bytes continueRow = bucket.getContinueRow();
Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length());
Iterator<SequencedExport<K, V>> exportIterator = Iterators.transform(memLimitIter,
ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType),
serializer.deserialize(ee.value, valType), ee.seq));
exportIterator = Iterators.consumingIterator(exportIterator);
exporter.export(exportIterator);
if (input.hasNext() || continueRow != null) {
// not everything was processed so notify self OR new data may have been inserted above the
// continue row
bucket.notifyExportObserver();
}
if (input.hasNext()) {
if (!memLimitIter.hasNext()) {
// stopped because of mem limit... set continue key
bucket.setContinueRow(input.next());
continueRow = null;
}
}
if (continueRow != null) {
bucket.clearContinueRow();
}
}