virtual std::vector GetKeyBucketsAndOptimizerParamsWithName()

in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_cluster_connection_pool.hpp [207:299]


  virtual std::vector<std::string> GetKeyBucketsAndOptimizerParamsWithName(
      const std::string &keys_prefix_name,
      const bool only_get_buckets) override {
    std::vector<std::string> keys_prefix_name_slices_in_redis;
    std::string redis_command;
    // get cluster info
    auto cmd = [](::sw::redis::Connection &connection,
                  ::sw::redis::StringView hkey) {
      connection.send("CLUSTER SLOTS");
    };
    ::sw::redis::StringView _hkey("0");
    std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply;
    try {
      reply = redis_conn_read->command(cmd, _hkey);
    } catch (const std::exception &err) {
      LOG(ERROR)
          << "RedisHandler error in "
             "GetKeyBucketsAndOptimizerParamsWithName(CLUSTER SLOTS) --  "
          << err.what();
      return keys_prefix_name_slices_in_redis;
    }

    std::vector<std::pair<std::string, long long>> ip_port_set;
    size_t servers_num = reply->elements;
    ip_port_set.reserve(servers_num);
    for (size_t i = 0; i < servers_num; ++i) {
      ip_port_set.emplace_back(std::make_pair<std::string, long long>(
          std::string(reply->element[i]->element[2]->element[0]->str,
                      reply->element[i]->element[2]->element[0]->len),
          std::move(reply->element[i]->element[2]->element[1]->integer)));
    }
    std::sort(ip_port_set.begin(), ip_port_set.end());
    ip_port_set.erase(std::unique(ip_port_set.begin(), ip_port_set.end()),
                      ip_port_set.end());

    std::unique_ptr<Redis> redis_client;
    std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply_server;
    ConnectionOptions connection_options;
    long long cursor = 0;
    const redisReply *set_reply;
    keys_prefix_name_slices_in_redis.reserve(
        redis_connection_params.storage_slice);
    for (size_t i = 0; i < ip_port_set.size(); ++i) {
      connection_options.host = ip_port_set[i].first;  // Required.
      connection_options.port =
          ip_port_set[i].second;  // Optional. The default port is 6379.
      connection_options.user = redis_connection_params.redis_user;
      connection_options.password =
          redis_connection_params
              .redis_password;  // Optional. No redis_password by default.
      connection_options.db =
          redis_connection_params
              .redis_db;  // Optional. Use the 0th database by default.
      redis_client.reset(new Redis(connection_options));
      auto cmd_per_server = [](::sw::redis::Connection &connection,
                               const char *str) { connection.send(str); };
      reply_server.reset();
      cursor = 0;
      while (true) {
        if (only_get_buckets) {
          redis_command = "SCAN " + std::to_string(cursor) + " MATCH " +
                          keys_prefix_name + "{[0123456789]*}";
        } else {
          redis_command = "SCAN " + std::to_string(cursor) + " MATCH " +
                          keys_prefix_name + "*{[0123456789]*}";
        }
        try {
          reply_server =
              redis_client->command(cmd_per_server, redis_command.data());
        } catch (const std::exception &err) {
          LOG(ERROR)
              << "RedisHandler error "
                 "GetKeyBucketsAndOptimizerParamsWithName(SCAN) in for IP "
              << ip_port_set[i].first << " --  " << err.what();
        }
        if (reply_server->element[0]->type == REDIS_REPLY_STRING) {
          // #define REDIS_REPLY_STRING 1
          cursor = std::atoll(reply_server->element[0]->str);
        }
        if (reply_server->element[1]->type == REDIS_REPLY_ARRAY) {
          set_reply = reply_server->element[1];
          for (size_t i = 0; i < set_reply->elements; ++i) {
            keys_prefix_name_slices_in_redis.emplace_back(std::string(
                set_reply->element[i]->str, set_reply->element[i]->len));
          }
        }
        if (cursor == 0) {
          break;
        }
      }
    }
    return keys_prefix_name_slices_in_redis;
  }