rocksdb::Status Database::Scan()

in src/storage/redis_db.cc [318:417]


rocksdb::Status Database::Scan(engine::Context &ctx, const std::string &cursor, uint64_t limit,
                               const std::string &prefix, const std::string &suffix_glob,
                               std::vector<std::string> *keys, std::string *end_cursor, RedisType type) {
  end_cursor->clear();
  uint64_t cnt = 0;
  uint16_t slot_start = 0;
  std::string ns_prefix;
  std::string user_key;

  auto iter = util::UniqueIterator(ctx, ctx.GetReadOptions(), metadata_cf_handle_);

  std::string ns_cursor = AppendNamespacePrefix(cursor);
  if (storage_->IsSlotIdEncoded()) {
    slot_start = cursor.empty() ? 0 : GetSlotIdFromKey(cursor);
    ns_prefix = ComposeNamespaceKey(namespace_, "", false);
    if (!prefix.empty()) {
      PutFixed16(&ns_prefix, slot_start);
      ns_prefix.append(prefix);
    }
  } else {
    ns_prefix = AppendNamespacePrefix(prefix);
  }

  if (!cursor.empty()) {
    iter->Seek(ns_cursor);
    if (iter->Valid()) {
      iter->Next();
    }
  } else if (ns_prefix.empty()) {
    iter->SeekToFirst();
  } else {
    iter->Seek(ns_prefix);
  }

  uint16_t slot_id = slot_start;
  while (true) {
    for (; iter->Valid() && cnt < limit; iter->Next()) {
      if (!ns_prefix.empty() && !iter->key().starts_with(ns_prefix)) {
        break;
      }
      Metadata metadata(kRedisNone, false);
      auto s = metadata.Decode(iter->value());
      if (!s.ok()) continue;

      if (type != kRedisNone && type != metadata.Type()) continue;

      if (metadata.Expired()) continue;
      std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());

      if (!util::StringMatch(suffix_glob, user_key.substr(prefix.size()))) {
        continue;
      }
      keys->emplace_back(user_key);
      cnt++;
    }

    if (auto s = iter->status(); !s.ok()) {
      return s;
    }

    if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
      if (!keys->empty() && cnt >= limit) {
        end_cursor->append(user_key);
      }
      break;
    }

    if (cnt >= limit) {
      end_cursor->append(user_key);
      break;
    }

    if (++slot_id >= HASH_SLOTS_SIZE) {
      break;
    }

    if (slot_id > slot_start + HASH_SLOTS_MAX_ITERATIONS) {
      if (keys->empty()) {
        if (iter->Valid()) {
          std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());
          auto res = std::mismatch(prefix.begin(), prefix.end(), user_key.begin());
          if (res.first == prefix.end() && util::StringMatch(suffix_glob, user_key.substr(prefix.size()))) {
            keys->emplace_back(user_key);
          }

          end_cursor->append(user_key);
        }
      } else {
        end_cursor->append(user_key);
      }
      break;
    }

    ns_prefix = ComposeNamespaceKey(namespace_, "", false);
    PutFixed16(&ns_prefix, slot_id);
    ns_prefix.append(prefix);
    iter->Seek(ns_prefix);
  }
  return rocksdb::Status::OK();
}