Result getRangeImpl()

in src/common/kv/mem/MemTransaction.h [292:365]


  Result<GetRangeResult> getRangeImpl(const KeySelector &begin, const KeySelector &end, int32_t limit, bool snapshot) {
    std::scoped_lock<std::mutex> guard(mutex_);
    if (canceled_) {
      return makeError(TransactionCode::kCanceled, "Canceled transaction!");
    }
    // always get all kvs in range
    auto result = mem_.getRange(begin, end, std::numeric_limits<int32_t>::max(), readVersion());
    RETURN_ON_ERROR(result);

    auto changeIt = changes_.lower_bound(begin.key);
    if (changeIt != changes_.end() && !begin.inclusive) ++changeIt;

    if (changeIt != changes_.end()) {
      std::vector<KeyValue> newKvs;
      auto originIt = result->kvs.begin();
      auto originEnded = [&] { return originIt == result->kvs.end(); };
      auto changeEnded = [&] {
        if (changeIt == changes_.end()) return true;
        if (end.inclusive && changeIt->first > end.key) return true;
        if (!end.inclusive && changeIt->first >= end.key) return true;
        if (result->hasMore) {
          assert(!result->kvs.empty());
          if (changeIt->first > result->kvs.back().key) return true;
        }
        return false;
      };
      auto pushChange = [&] {
        if (changeIt->second.has_value()) newKvs.emplace_back(changeIt->first, *changeIt->second);
      };
      for (;;) {
        if (originEnded() || changeEnded()) break;
        if (originIt->key < changeIt->first) {
          newKvs.push_back(*originIt);
          ++originIt;
        } else if (originIt->key > changeIt->first) {
          pushChange();
          ++changeIt;
        } else {
          pushChange();
          ++changeIt;
          ++originIt;
        }
      }
      for (; !originEnded(); ++originIt) {
        newKvs.push_back(*originIt);
      }
      for (; !changeEnded(); ++changeIt) {
        pushChange();
      }
      result->kvs = std::move(newKvs);
    }

    if (limit < 1) {
      limit = mem::memKvDefaultLimit;
    }
    if (result->kvs.size() > (size_t)limit) {
      result->kvs = std::vector(&result->kvs[0], &result->kvs[(size_t)limit]);
      result->hasMore = true;
      assert(result->kvs.size() == (size_t)limit);
    }

    String rangeBegin = begin.inclusive ? String(begin.key) : TransactionHelper::keyAfter(begin.key);
    String rangeEnd;
    if (result->hasMore) {
      rangeEnd = TransactionHelper::keyAfter(result->kvs.end()[-1].key);
    } else {
      rangeEnd = end.inclusive ? TransactionHelper::keyAfter(end.key) : String(end.key);
    }
    if (!snapshot) {
      readRanges_.push_back({rangeBegin, rangeEnd});
    }

    return result;
  }