in src/common/kv/mem/MemTransaction.h [292:365]
Result<GetRangeResult> getRangeImpl(const KeySelector &begin, const KeySelector &end, int32_t limit, bool snapshot) {
std::scoped_lock<std::mutex> guard(mutex_);
if (canceled_) {
return makeError(TransactionCode::kCanceled, "Canceled transaction!");
}
// always get all kvs in range
auto result = mem_.getRange(begin, end, std::numeric_limits<int32_t>::max(), readVersion());
RETURN_ON_ERROR(result);
auto changeIt = changes_.lower_bound(begin.key);
if (changeIt != changes_.end() && !begin.inclusive) ++changeIt;
if (changeIt != changes_.end()) {
std::vector<KeyValue> newKvs;
auto originIt = result->kvs.begin();
auto originEnded = [&] { return originIt == result->kvs.end(); };
auto changeEnded = [&] {
if (changeIt == changes_.end()) return true;
if (end.inclusive && changeIt->first > end.key) return true;
if (!end.inclusive && changeIt->first >= end.key) return true;
if (result->hasMore) {
assert(!result->kvs.empty());
if (changeIt->first > result->kvs.back().key) return true;
}
return false;
};
auto pushChange = [&] {
if (changeIt->second.has_value()) newKvs.emplace_back(changeIt->first, *changeIt->second);
};
for (;;) {
if (originEnded() || changeEnded()) break;
if (originIt->key < changeIt->first) {
newKvs.push_back(*originIt);
++originIt;
} else if (originIt->key > changeIt->first) {
pushChange();
++changeIt;
} else {
pushChange();
++changeIt;
++originIt;
}
}
for (; !originEnded(); ++originIt) {
newKvs.push_back(*originIt);
}
for (; !changeEnded(); ++changeIt) {
pushChange();
}
result->kvs = std::move(newKvs);
}
if (limit < 1) {
limit = mem::memKvDefaultLimit;
}
if (result->kvs.size() > (size_t)limit) {
result->kvs = std::vector(&result->kvs[0], &result->kvs[(size_t)limit]);
result->hasMore = true;
assert(result->kvs.size() == (size_t)limit);
}
String rangeBegin = begin.inclusive ? String(begin.key) : TransactionHelper::keyAfter(begin.key);
String rangeEnd;
if (result->hasMore) {
rangeEnd = TransactionHelper::keyAfter(result->kvs.end()[-1].key);
} else {
rangeEnd = end.inclusive ? TransactionHelper::keyAfter(end.key) : String(end.key);
}
if (!snapshot) {
readRanges_.push_back({rangeBegin, rangeEnd});
}
return result;
}