in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_cluster_connection_pool.hpp [207:299]
virtual std::vector<std::string> GetKeyBucketsAndOptimizerParamsWithName(
const std::string &keys_prefix_name,
const bool only_get_buckets) override {
std::vector<std::string> keys_prefix_name_slices_in_redis;
std::string redis_command;
// get cluster info
auto cmd = [](::sw::redis::Connection &connection,
::sw::redis::StringView hkey) {
connection.send("CLUSTER SLOTS");
};
::sw::redis::StringView _hkey("0");
std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply;
try {
reply = redis_conn_read->command(cmd, _hkey);
} catch (const std::exception &err) {
LOG(ERROR)
<< "RedisHandler error in "
"GetKeyBucketsAndOptimizerParamsWithName(CLUSTER SLOTS) -- "
<< err.what();
return keys_prefix_name_slices_in_redis;
}
std::vector<std::pair<std::string, long long>> ip_port_set;
size_t servers_num = reply->elements;
ip_port_set.reserve(servers_num);
for (size_t i = 0; i < servers_num; ++i) {
ip_port_set.emplace_back(std::make_pair<std::string, long long>(
std::string(reply->element[i]->element[2]->element[0]->str,
reply->element[i]->element[2]->element[0]->len),
std::move(reply->element[i]->element[2]->element[1]->integer)));
}
std::sort(ip_port_set.begin(), ip_port_set.end());
ip_port_set.erase(std::unique(ip_port_set.begin(), ip_port_set.end()),
ip_port_set.end());
std::unique_ptr<Redis> redis_client;
std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply_server;
ConnectionOptions connection_options;
long long cursor = 0;
const redisReply *set_reply;
keys_prefix_name_slices_in_redis.reserve(
redis_connection_params.storage_slice);
for (size_t i = 0; i < ip_port_set.size(); ++i) {
connection_options.host = ip_port_set[i].first; // Required.
connection_options.port =
ip_port_set[i].second; // Optional. The default port is 6379.
connection_options.user = redis_connection_params.redis_user;
connection_options.password =
redis_connection_params
.redis_password; // Optional. No redis_password by default.
connection_options.db =
redis_connection_params
.redis_db; // Optional. Use the 0th database by default.
redis_client.reset(new Redis(connection_options));
auto cmd_per_server = [](::sw::redis::Connection &connection,
const char *str) { connection.send(str); };
reply_server.reset();
cursor = 0;
while (true) {
if (only_get_buckets) {
redis_command = "SCAN " + std::to_string(cursor) + " MATCH " +
keys_prefix_name + "{[0123456789]*}";
} else {
redis_command = "SCAN " + std::to_string(cursor) + " MATCH " +
keys_prefix_name + "*{[0123456789]*}";
}
try {
reply_server =
redis_client->command(cmd_per_server, redis_command.data());
} catch (const std::exception &err) {
LOG(ERROR)
<< "RedisHandler error "
"GetKeyBucketsAndOptimizerParamsWithName(SCAN) in for IP "
<< ip_port_set[i].first << " -- " << err.what();
}
if (reply_server->element[0]->type == REDIS_REPLY_STRING) {
// #define REDIS_REPLY_STRING 1
cursor = std::atoll(reply_server->element[0]->str);
}
if (reply_server->element[1]->type == REDIS_REPLY_ARRAY) {
set_reply = reply_server->element[1];
for (size_t i = 0; i < set_reply->elements; ++i) {
keys_prefix_name_slices_in_redis.emplace_back(std::string(
set_reply->element[i]->str, set_reply->element[i]->len));
}
}
if (cursor == 0) {
break;
}
}
}
return keys_prefix_name_slices_in_redis;
}