rocksdb::Status Database::Sort()

in src/storage/redis_db.cc [771:919]


rocksdb::Status Database::Sort(engine::Context &ctx, RedisType type, const std::string &key, const SortArgument &args,
                               std::vector<std::optional<std::string>> *elems, SortResult *res) {
  // Obtain the length of the object to sort.
  const std::string ns_key = AppendNamespacePrefix(key);
  Metadata metadata(type, false);
  auto s = GetMetadata(ctx, {type}, ns_key, &metadata);
  if (!s.ok()) return s;

  if (metadata.size > SORT_LENGTH_LIMIT) {
    *res = SortResult::LIMIT_EXCEEDED;
    return rocksdb::Status::OK();
  }
  auto vectorlen = static_cast<int>(metadata.size);

  // Adjust the offset and count of the limit
  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, vectorlen - 1);
  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, vectorlen - offset);
  if (count == -1) count = vectorlen - offset;

  // Get the elements that need to be sorted
  std::vector<std::string> str_vec;
  if (count != 0) {
    if (type == RedisType::kRedisList) {
      auto list_db = redis::List(storage_, namespace_);

      if (args.dontsort) {
        if (args.desc) {
          s = list_db.Range(ctx, key, -count - offset, -1 - offset, &str_vec);
          if (!s.ok()) return s;
          std::reverse(str_vec.begin(), str_vec.end());
        } else {
          s = list_db.Range(ctx, key, offset, offset + count - 1, &str_vec);
          if (!s.ok()) return s;
        }
      } else {
        s = list_db.Range(ctx, key, 0, -1, &str_vec);
        if (!s.ok()) return s;
      }
    } else if (type == RedisType::kRedisSet) {
      auto set_db = redis::Set(storage_, namespace_);
      s = set_db.Members(ctx, key, &str_vec);
      if (!s.ok()) return s;

      if (args.dontsort) {
        str_vec = std::vector(std::make_move_iterator(str_vec.begin() + offset),
                              std::make_move_iterator(str_vec.begin() + offset + count));
      }
    } else if (type == RedisType::kRedisZSet) {
      auto zset_db = redis::ZSet(storage_, namespace_);
      std::vector<MemberScore> member_scores;

      if (args.dontsort) {
        RangeRankSpec spec;
        spec.start = offset;
        spec.stop = offset + count - 1;
        spec.reversed = args.desc;
        s = zset_db.RangeByRank(ctx, key, spec, &member_scores, nullptr);
        if (!s.ok()) return s;

        for (auto &member_score : member_scores) {
          str_vec.emplace_back(std::move(member_score.member));
        }
      } else {
        s = zset_db.GetAllMemberScores(ctx, key, &member_scores);
        if (!s.ok()) return s;

        for (auto &member_score : member_scores) {
          str_vec.emplace_back(std::move(member_score.member));
        }
      }
    } else {
      *res = SortResult::UNKNOWN_TYPE;
      return s;
    }
  }

  std::vector<RedisSortObject> sort_vec(str_vec.size());
  for (size_t i = 0; i < str_vec.size(); ++i) {
    sort_vec[i].obj = str_vec[i];
  }

  // Sort by BY, ALPHA, ASC/DESC
  if (!args.dontsort) {
    for (size_t i = 0; i < sort_vec.size(); ++i) {
      std::string byval;
      if (!args.sortby.empty()) {
        auto lookup = lookupKeyByPattern(ctx, args.sortby, str_vec[i]);
        if (!lookup.has_value()) continue;
        byval = std::move(lookup.value());
      } else {
        byval = str_vec[i];
      }

      if (args.alpha && !args.sortby.empty()) {
        sort_vec[i].v = byval;
      } else if (!args.alpha && !byval.empty()) {
        auto double_byval = ParseFloat<double>(byval);
        if (!double_byval) {
          *res = SortResult::DOUBLE_CONVERT_ERROR;
          return rocksdb::Status::OK();
        }
        sort_vec[i].v = *double_byval;
      }
    }

    std::sort(sort_vec.begin(), sort_vec.end(), [&args](const RedisSortObject &a, const RedisSortObject &b) {
      return RedisSortObject::SortCompare(a, b, args);
    });

    // Gets the element specified by Limit
    if (offset != 0 || count != vectorlen) {
      sort_vec = std::vector(std::make_move_iterator(sort_vec.begin() + offset),
                             std::make_move_iterator(sort_vec.begin() + offset + count));
    }
  }

  // Perform storage
  for (auto &elem : sort_vec) {
    if (args.getpatterns.empty()) {
      elems->emplace_back(elem.obj);
    }
    for (const std::string &pattern : args.getpatterns) {
      std::optional<std::string> val = lookupKeyByPattern(ctx, pattern, elem.obj);
      if (val.has_value()) {
        elems->emplace_back(val.value());
      } else {
        elems->emplace_back(std::nullopt);
      }
    }
  }

  if (!args.storekey.empty()) {
    std::vector<std::string> store_elems;
    store_elems.reserve(elems->size());
    for (const auto &e : *elems) {
      store_elems.emplace_back(e.value_or(""));
    }
    redis::List list_db(storage_, namespace_);
    s = list_db.Trim(ctx, args.storekey, -1, 0);

    if (!s.ok()) return s;
    uint64_t new_size = 0;
    s = list_db.Push(ctx, args.storekey, std::vector<Slice>(store_elems.cbegin(), store_elems.cend()), false,
                     &new_size);
    if (!s.ok()) return s;
  }

  return rocksdb::Status::OK();
}