public void process()

in modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java [53:86]


  public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
    ExportBucket bucket = new ExportBucket(tx, row);

    Bytes continueRow = bucket.getContinueRow();

    Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
    MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length());

    Iterator<SequencedExport<K, V>> exportIterator = Iterators.transform(memLimitIter,
        ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType),
            serializer.deserialize(ee.value, valType), ee.seq));

    exportIterator = Iterators.consumingIterator(exportIterator);

    exporter.export(exportIterator);

    if (input.hasNext() || continueRow != null) {
      // not everything was processed so notify self OR new data may have been inserted above the
      // continue row
      bucket.notifyExportObserver();
    }

    if (input.hasNext()) {
      if (!memLimitIter.hasNext()) {
        // stopped because of mem limit... set continue key
        bucket.setContinueRow(input.next());
        continueRow = null;
      }
    }

    if (continueRow != null) {
      bucket.clearContinueRow();
    }
  }