in src/storage/redis_db.cc [771:919]
rocksdb::Status Database::Sort(engine::Context &ctx, RedisType type, const std::string &key, const SortArgument &args,
std::vector<std::optional<std::string>> *elems, SortResult *res) {
// Obtain the length of the object to sort.
const std::string ns_key = AppendNamespacePrefix(key);
Metadata metadata(type, false);
auto s = GetMetadata(ctx, {type}, ns_key, &metadata);
if (!s.ok()) return s;
if (metadata.size > SORT_LENGTH_LIMIT) {
*res = SortResult::LIMIT_EXCEEDED;
return rocksdb::Status::OK();
}
auto vectorlen = static_cast<int>(metadata.size);
// Adjust the offset and count of the limit
int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, vectorlen - 1);
int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, vectorlen - offset);
if (count == -1) count = vectorlen - offset;
// Get the elements that need to be sorted
std::vector<std::string> str_vec;
if (count != 0) {
if (type == RedisType::kRedisList) {
auto list_db = redis::List(storage_, namespace_);
if (args.dontsort) {
if (args.desc) {
s = list_db.Range(ctx, key, -count - offset, -1 - offset, &str_vec);
if (!s.ok()) return s;
std::reverse(str_vec.begin(), str_vec.end());
} else {
s = list_db.Range(ctx, key, offset, offset + count - 1, &str_vec);
if (!s.ok()) return s;
}
} else {
s = list_db.Range(ctx, key, 0, -1, &str_vec);
if (!s.ok()) return s;
}
} else if (type == RedisType::kRedisSet) {
auto set_db = redis::Set(storage_, namespace_);
s = set_db.Members(ctx, key, &str_vec);
if (!s.ok()) return s;
if (args.dontsort) {
str_vec = std::vector(std::make_move_iterator(str_vec.begin() + offset),
std::make_move_iterator(str_vec.begin() + offset + count));
}
} else if (type == RedisType::kRedisZSet) {
auto zset_db = redis::ZSet(storage_, namespace_);
std::vector<MemberScore> member_scores;
if (args.dontsort) {
RangeRankSpec spec;
spec.start = offset;
spec.stop = offset + count - 1;
spec.reversed = args.desc;
s = zset_db.RangeByRank(ctx, key, spec, &member_scores, nullptr);
if (!s.ok()) return s;
for (auto &member_score : member_scores) {
str_vec.emplace_back(std::move(member_score.member));
}
} else {
s = zset_db.GetAllMemberScores(ctx, key, &member_scores);
if (!s.ok()) return s;
for (auto &member_score : member_scores) {
str_vec.emplace_back(std::move(member_score.member));
}
}
} else {
*res = SortResult::UNKNOWN_TYPE;
return s;
}
}
std::vector<RedisSortObject> sort_vec(str_vec.size());
for (size_t i = 0; i < str_vec.size(); ++i) {
sort_vec[i].obj = str_vec[i];
}
// Sort by BY, ALPHA, ASC/DESC
if (!args.dontsort) {
for (size_t i = 0; i < sort_vec.size(); ++i) {
std::string byval;
if (!args.sortby.empty()) {
auto lookup = lookupKeyByPattern(ctx, args.sortby, str_vec[i]);
if (!lookup.has_value()) continue;
byval = std::move(lookup.value());
} else {
byval = str_vec[i];
}
if (args.alpha && !args.sortby.empty()) {
sort_vec[i].v = byval;
} else if (!args.alpha && !byval.empty()) {
auto double_byval = ParseFloat<double>(byval);
if (!double_byval) {
*res = SortResult::DOUBLE_CONVERT_ERROR;
return rocksdb::Status::OK();
}
sort_vec[i].v = *double_byval;
}
}
std::sort(sort_vec.begin(), sort_vec.end(), [&args](const RedisSortObject &a, const RedisSortObject &b) {
return RedisSortObject::SortCompare(a, b, args);
});
// Gets the element specified by Limit
if (offset != 0 || count != vectorlen) {
sort_vec = std::vector(std::make_move_iterator(sort_vec.begin() + offset),
std::make_move_iterator(sort_vec.begin() + offset + count));
}
}
// Perform storage
for (auto &elem : sort_vec) {
if (args.getpatterns.empty()) {
elems->emplace_back(elem.obj);
}
for (const std::string &pattern : args.getpatterns) {
std::optional<std::string> val = lookupKeyByPattern(ctx, pattern, elem.obj);
if (val.has_value()) {
elems->emplace_back(val.value());
} else {
elems->emplace_back(std::nullopt);
}
}
}
if (!args.storekey.empty()) {
std::vector<std::string> store_elems;
store_elems.reserve(elems->size());
for (const auto &e : *elems) {
store_elems.emplace_back(e.value_or(""));
}
redis::List list_db(storage_, namespace_);
s = list_db.Trim(ctx, args.storekey, -1, 0);
if (!s.ok()) return s;
uint64_t new_size = 0;
s = list_db.Push(ctx, args.storekey, std::vector<Slice>(store_elems.cbegin(), store_elems.cend()), false,
&new_size);
if (!s.ok()) return s;
}
return rocksdb::Status::OK();
}