common/dbconnector.cpp (885 lines of code) (raw):

#include <string.h> #include <stdint.h> #include <vector> #include <unistd.h> #include <errno.h> #include <system_error> #include <fstream> #include <nlohmann/json.hpp> #include <set> #include "logger.h" #include "common/dbconnector.h" #include "common/redisreply.h" #include "common/redispipeline.h" #include "common/pubsub.h" using json = nlohmann::json; using namespace std; using namespace swss; void SonicDBConfig::parseDatabaseConfig(const string &file, std::map<std::string, RedisInstInfo> &inst_entry, std::unordered_map<std::string, SonicDBInfo> &db_entry, std::unordered_map<int, std::string> &separator_entry) { ifstream i(file); if (i.good()) { try { json j; i >> j; for (auto it = j["INSTANCES"].begin(); it!= j["INSTANCES"].end(); it++) { string instName = it.key(); string socket; auto path = it.value().find("unix_socket_path"); if (path != it.value().end()) { socket = *path; } string hostname = it.value().at("hostname"); int port = it.value().at("port"); inst_entry[instName] = {socket, hostname, port}; } for (auto it = j["DATABASES"].begin(); it!= j["DATABASES"].end(); it++) { string dbName = it.key(); string instName = it.value().at("instance"); int dbId = it.value().at("id"); string separator = it.value().at("separator"); db_entry[dbName] = {instName, dbId, separator}; separator_entry.emplace(dbId, separator); } } catch (domain_error& e) { SWSS_LOG_ERROR("key doesn't exist in json object, NULL value has no iterator >> %s\n", e.what()); throw runtime_error("key doesn't exist in json object, NULL value has no iterator >> " + string(e.what())); } catch (exception &e) { SWSS_LOG_ERROR("Sonic database config file syntax error >> %s\n", e.what()); throw runtime_error("Sonic database config file syntax error >> " + string(e.what())); } } else { SWSS_LOG_ERROR("Sonic database config file doesn't exist at %s\n", file.c_str()); throw runtime_error("Sonic database config file doesn't exist at " + file); } } void SonicDBConfig::initializeGlobalConfig(const string &file) { std::string dir_name; std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); SWSS_LOG_ENTER(); if (m_global_init) { SWSS_LOG_ERROR("SonicDBConfig Global config is already initialized"); return; } ifstream i(file); if (i.good()) { // Get the directory name from the file path given as input. std::string::size_type pos = file.rfind("/"); if( pos != std::string::npos) { dir_name = file.substr(0,pos+1); } try { json j; i >> j; for (auto& element : j["INCLUDES"]) { std::unordered_map<std::string, SonicDBInfo> db_entry; std::map<std::string, RedisInstInfo> inst_entry; std::unordered_map<int, std::string> separator_entry; std::string local_file = dir_name; local_file.append(element["include"]); SonicDBKey key; if(!element["namespace"].empty()) { key.netns = element["namespace"]; } if (!element["container_name"].empty()) { key.containerName = element["container_name"]; } // If database_config.json is already initlized via SonicDBConfig::initialize // skip initializing it here again. if (key.isEmpty() && m_init) { continue; } parseDatabaseConfig(local_file, inst_entry, db_entry, separator_entry); m_inst_info[key] = inst_entry; m_db_info[key] = db_entry; m_db_separator[key] = separator_entry; if(key.isEmpty()) { // Make regular init also done m_init = true; } } } catch (domain_error& e) { SWSS_LOG_ERROR("key doesn't exist in json object, NULL value has no iterator >> %s\n", e.what()); throw runtime_error("key doesn't exist in json object, NULL value has no iterator >> " + string(e.what())); } catch (exception &e) { SWSS_LOG_ERROR("Sonic database config file syntax error >> %s\n", e.what()); throw runtime_error("Sonic database config file syntax error >> " + string(e.what())); } } else { SWSS_LOG_ERROR("Sonic database config global file doesn't exist at %s\n", file.c_str()); } // Set it as the global config file is already parsed and init done. m_global_init = true; } void SonicDBConfig::initialize(const string &file) { std::unordered_map<std::string, SonicDBInfo> db_entry; std::map<std::string, RedisInstInfo> inst_entry; std::unordered_map<int, std::string> separator_entry; std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); SWSS_LOG_ENTER(); if (m_init) { SWSS_LOG_ERROR("SonicDBConfig already initialized"); throw runtime_error("SonicDBConfig already initialized"); } SonicDBKey empty_key; parseDatabaseConfig(file, inst_entry, db_entry, separator_entry); m_inst_info.emplace(empty_key, std::move(inst_entry)); m_db_info.emplace(empty_key, std::move(db_entry)); m_db_separator.emplace(empty_key, std::move(separator_entry)); // Set it as the config file is already parsed and init done. m_init = true; } // This API is used to reset the SonicDBConfig class. // And then user can call initialize with different config file. void SonicDBConfig::reset() { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); m_init = false; m_global_init = false; m_inst_info.clear(); m_db_info.clear(); m_db_separator.clear(); } void SonicDBConfig::validateNamespace(const string &netns) { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); SWSS_LOG_ENTER(); // With valid namespace input and database_global.json is not loaded, ask user to initializeGlobalConfig first if(!netns.empty()) { // If global initialization is not done, ask user to initialize global DB Config first. if (!m_global_init) { SWSS_LOG_THROW("Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"); } // Check if the namespace is valid, check if this is a key in either of this map for (const auto &entry: m_inst_info) { if (entry.first.netns == netns) { return; } } SWSS_LOG_THROW("Namespace %s is not a valid namespace name in config file", netns.c_str()); } } SonicDBInfo& SonicDBConfig::getDbInfo(const std::string &dbName, const SonicDBKey &key) { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); SWSS_LOG_ENTER(); if (!m_init) initialize(DEFAULT_SONIC_DB_CONFIG_FILE); if (!key.isEmpty()) { if (!m_global_init) { SWSS_LOG_THROW("Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"); } } auto foundEntry = m_db_info.find(key); if (foundEntry == m_db_info.end()) { string msg = "Key " + key.toString() + " is not a valid key name in config file"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } auto& infos = foundEntry->second; auto foundDb = infos.find(dbName); if (foundDb == infos.end()) { string msg = "Failed to find " + dbName + " database in " + key.toString() + " key"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } return foundDb->second; } RedisInstInfo& SonicDBConfig::getRedisInfo(const std::string &dbName, const SonicDBKey &key) { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); SWSS_LOG_ENTER(); if (!m_init) initialize(DEFAULT_SONIC_DB_CONFIG_FILE); if (!key.isEmpty()) { if (!m_global_init) { SWSS_LOG_THROW("Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"); } } auto foundEntry = m_inst_info.find(key); if (foundEntry == m_inst_info.end()) { string msg = "Key " + key.toString() + " is not a valid key name in Redis instances in config file"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } auto& redisInfos = foundEntry->second; auto foundRedis = redisInfos.find(getDbInst(dbName, key)); if (foundRedis == redisInfos.end()) { string msg = "Failed to find the Redis instance for " + dbName + " database in " + key.toString() + " key"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } return foundRedis->second; } string SonicDBConfig::getDbInst(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbInst(dbName, key); } string SonicDBConfig::getDbInst(const std::string &dbName, const SonicDBKey &key) { return getDbInfo(dbName, key).instName; } int SonicDBConfig::getDbId(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbId(dbName, key); } int SonicDBConfig::getDbId(const std::string &dbName, const SonicDBKey &key) { return getDbInfo(dbName, key).dbId; } string SonicDBConfig::getSeparator(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getSeparator(dbName, key); } string SonicDBConfig::getSeparator(const std::string &dbName, const SonicDBKey &key) { return getDbInfo(dbName, key).separator; } string SonicDBConfig::getSeparator(int dbId, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getSeparator(dbId, key); } std::string SonicDBConfig::getSeparator(int dbId, const SonicDBKey &key) { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); if (!m_init) initialize(DEFAULT_SONIC_DB_CONFIG_FILE); if (!key.isEmpty()) { if (!m_global_init) { SWSS_LOG_THROW("Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"); } } auto foundEntry = m_db_separator.find(key); if (foundEntry == m_db_separator.end()) { string msg = "Key " + key.toString() + " is not a valid key name in config file"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } auto seps = foundEntry->second; auto foundDb = seps.find(dbId); if (foundDb == seps.end()) { string msg = "Failed to find " + to_string(dbId) + " database in " + key.toString() + " key"; SWSS_LOG_ERROR("%s", msg.c_str()); throw out_of_range(msg); } return foundDb->second; } string SonicDBConfig::getSeparator(const DBConnector* db) { if (!db) { throw std::invalid_argument("db cannot be null"); } string dbName = db->getDbName(); auto key = db->getDBKey(); if (dbName.empty()) { return getSeparator(db->getDbId(), key); } else { return getSeparator(dbName, key); } } string SonicDBConfig::getDbSock(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbSock(dbName, key); } string SonicDBConfig::SonicDBConfig::getDbSock(const string &dbName, const SonicDBKey &key) { return getRedisInfo(dbName, key).unixSocketPath; } string SonicDBConfig::getDbHostname(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbHostname(dbName, key); } string SonicDBConfig::getDbHostname(const string &dbName, const SonicDBKey &key) { return getRedisInfo(dbName, key).hostname; } int SonicDBConfig::getDbPort(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbPort(dbName, key); } int SonicDBConfig::getDbPort(const string &dbName, const SonicDBKey &key) { return getRedisInfo(dbName, key).port; } vector<string> SonicDBConfig::getNamespaces() { set<string> list; std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); if (!m_init) initialize(DEFAULT_SONIC_DB_CONFIG_FILE); // This API returns back all namespaces including '' representing global ns. for (auto it = m_inst_info.cbegin(); it != m_inst_info.cend(); ++it) { list.insert(it->first.netns); } return vector<string>(list.begin(), list.end()); } vector<SonicDBKey> SonicDBConfig::getDbKeys() { vector<SonicDBKey> keys; std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); if (!m_init) initialize(DEFAULT_SONIC_DB_CONFIG_FILE); // This API returns back all db keys. for (auto it = m_inst_info.cbegin(); it != m_inst_info.cend(); ++it) { keys.push_back(it->first); } return keys; } std::vector<std::string> SonicDBConfig::getDbList(const std::string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getDbList(key); } std::vector<std::string> SonicDBConfig::getDbList(const SonicDBKey &key) { std::lock_guard<std::recursive_mutex> guard(m_db_info_mutex); if (!m_init) { initialize(); } validateNamespace(key.netns); std::vector<std::string> dbNames; for (auto& imap: m_db_info.at(key)) { dbNames.push_back(imap.first); } return dbNames; } map<string, RedisInstInfo> SonicDBConfig::getInstanceList(const std::string &netns, const std::string &containerName) { SonicDBKey key; key.netns = netns; key.containerName = containerName; return getInstanceList(key); } map<string, RedisInstInfo> SonicDBConfig::getInstanceList(const SonicDBKey &key) { if (!m_init) { initialize(); } validateNamespace(key.netns); map<string, RedisInstInfo> result; auto iterator = m_inst_info.find(key); if (iterator != m_inst_info.end()) { return iterator->second; } return map<string, RedisInstInfo>(); } constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_CONFIG_FILE; constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE; std::recursive_mutex SonicDBConfig::m_db_info_mutex; unordered_map<SonicDBKey, std::map<std::string, RedisInstInfo>, SonicDBKeyHash> SonicDBConfig::m_inst_info; unordered_map<SonicDBKey, std::unordered_map<std::string, SonicDBInfo>, SonicDBKeyHash> SonicDBConfig::m_db_info; unordered_map<SonicDBKey, std::unordered_map<int, std::string>, SonicDBKeyHash> SonicDBConfig::m_db_separator; bool SonicDBConfig::m_init = false; bool SonicDBConfig::m_global_init = false; constexpr const char *RedisContext::DEFAULT_UNIXSOCKET; RedisContext::~RedisContext() { if(m_conn) redisFree(m_conn); } RedisContext::RedisContext() : m_conn(NULL) { } RedisContext::RedisContext(const RedisContext &other) { auto octx = other.getContext(); const char *unixPath = octx->unix_sock.path; if (unixPath) { initContext(unixPath, octx->timeout); } else { initContext(octx->tcp.host, octx->tcp.port, octx->timeout); } } void RedisContext::initContext(const char *host, int port, const timeval *tv) { if (tv) { m_conn = redisConnectWithTimeout(host, port, *tv); } else { m_conn = redisConnect(host, port); } if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), "Unable to connect to redis - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } void RedisContext::initContext(const char *path, const timeval *tv) { if (tv) { m_conn = redisConnectUnixWithTimeout(path, *tv); } else { m_conn = redisConnectUnix(path); } if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), "Unable to connect to redis (unix-socket) - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } redisContext *RedisContext::getContext() const { return m_conn; } void RedisContext::setContext(redisContext *ctx) { m_conn = ctx; } void RedisContext::setClientName(const string& clientName) { string command("CLIENT SETNAME "); command += clientName; RedisReply r(this, command, REDIS_REPLY_STATUS); r.checkStatusOK(); } string RedisContext::getClientName() { string command("CLIENT GETNAME"); RedisReply r(this, command); auto ctx = r.getContext(); if (ctx->type == REDIS_REPLY_STRING) { return r.getReply<std::string>(); } else { if (ctx->type != REDIS_REPLY_NIL) SWSS_LOG_ERROR("Unable to obtain Redis client name"); return ""; } } void DBConnector::select(DBConnector *db) { string select("SELECT "); select += to_string(db->getDbId()); RedisReply r(db, select, REDIS_REPLY_STATUS); r.checkStatusOK(); } DBConnector::DBConnector(const DBConnector &other) : RedisContext(other) , m_dbId(other.m_dbId) , m_dbName(other.m_dbName) , m_key(other.m_key) { select(this); } DBConnector::DBConnector(int dbId, const RedisContext& ctx) : RedisContext(ctx) , m_dbId(dbId) { select(this); } static struct timeval ms_to_timeval(unsigned int ms) { return { .tv_sec = (time_t)ms / 1000, .tv_usec = ((suseconds_t)ms % 1000) * 1000 }; } DBConnector::DBConnector(int dbId, const string& hostname, int port, unsigned int timeout_ms) : m_dbId(dbId) { struct timeval tv = ms_to_timeval(timeout_ms); struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(hostname.c_str(), port, ptv); select(this); } DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout_ms) : m_dbId(dbId) { struct timeval tv = ms_to_timeval(timeout_ms); struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(unixPath.c_str(), ptv); select(this); } DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const string& netns) : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey(netns)) { } DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const SonicDBKey &key) : m_dbId(SonicDBConfig::getDbId(dbName, key)) , m_dbName(dbName) , m_key(key) { struct timeval tv = ms_to_timeval(timeout_ms); struct timeval *ptv = timeout_ms ? &tv : NULL; if (isTcpConn) { initContext(SonicDBConfig::getDbHostname(dbName, m_key).c_str(), SonicDBConfig::getDbPort(dbName, m_key), ptv); } else { initContext(SonicDBConfig::getDbSock(dbName, m_key).c_str(), ptv); } select(this); } DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn) : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey()) { // Empty constructor } int DBConnector::getDbId() const { return m_dbId; } string DBConnector::getDbName() const { return m_dbName; } void DBConnector::setNamespace(const string& netns) { m_key.netns = netns; } string DBConnector::getNamespace() const { return m_key.netns; } void DBConnector::setDBKey(const SonicDBKey &key) { m_key = key; } SonicDBKey DBConnector::getDBKey() const { return m_key; } DBConnector *DBConnector::newConnector(unsigned int timeout) const { DBConnector *ret; if (getContext()->connection_type == REDIS_CONN_TCP) ret = new DBConnector(getDbId(), getContext()->tcp.host, getContext()->tcp.port, timeout); else ret = new DBConnector(getDbId(), getContext()->unix_sock.path, timeout); ret->m_dbName = m_dbName; ret->setDBKey(getDBKey()); return ret; } PubSub *DBConnector::pubsub() { return new PubSub(this); } int64_t DBConnector::del(const string &key) { RedisCommand sdel; sdel.format("DEL %s", key.c_str()); RedisReply r(this, sdel, REDIS_REPLY_INTEGER); return r.getContext()->integer; } bool DBConnector::exists(const string &key) { RedisCommand rexists; if (key.find_first_of(" \t") != string::npos) { SWSS_LOG_ERROR("EXISTS failed, invalid space or tab in single key: %s", key.c_str()); throw runtime_error("EXISTS failed, invalid space or tab in single key"); } rexists.format("EXISTS %s", key.c_str()); RedisReply r(this, rexists, REDIS_REPLY_INTEGER); return r.getContext()->integer > 0; } int64_t DBConnector::hdel(const string &key, const string &field) { RedisCommand shdel; shdel.format("HDEL %s %s", key.c_str(), field.c_str()); RedisReply r(this, shdel, REDIS_REPLY_INTEGER); return r.getContext()->integer; } int64_t DBConnector::hdel(const std::string &key, const std::vector<std::string> &fields) { RedisCommand shdel; shdel.formatHDEL(key, fields); RedisReply r(this, shdel, REDIS_REPLY_INTEGER); return r.getContext()->integer; } void DBConnector::hset(const string &key, const string &field, const string &value) { RedisCommand shset; shset.format("HSET %s %s %s", key.c_str(), field.c_str(), value.c_str()); RedisReply r(this, shset, REDIS_REPLY_INTEGER); } bool DBConnector::set(const string &key, const string &value) { RedisCommand sset; sset.format("SET %s %s", key.c_str(), value.c_str()); RedisReply r(this, sset, REDIS_REPLY_STATUS); string s = r.getReply<string>(); return s == "OK"; } bool DBConnector::set(const string &key, int value) { return set(key, to_string(value)); } void DBConnector::config_set(const std::string &key, const std::string &value) { RedisCommand sset; sset.format("CONFIG SET %s %s", key.c_str(), value.c_str()); RedisReply r(this, sset, REDIS_REPLY_STATUS); } bool DBConnector::flushdb() { RedisCommand sflushdb; sflushdb.format("FLUSHDB"); RedisReply r(this, sflushdb, REDIS_REPLY_STATUS); string s = r.getReply<string>(); return s == "OK"; } vector<string> DBConnector::keys(const string &key) { RedisCommand skeys; skeys.format("KEYS %s", key.c_str()); RedisReply r(this, skeys, REDIS_REPLY_ARRAY); auto ctx = r.getContext(); vector<string> list; for (unsigned int i = 0; i < ctx->elements; i++) list.emplace_back(ctx->element[i]->str); return list; } pair<int, vector<string>> DBConnector::scan(int cursor, const char *match, uint32_t count) { RedisCommand sscan; sscan.format("SCAN %d MATCH %s COUNT %u", cursor, match, count); RedisReply r(this, sscan, REDIS_REPLY_ARRAY); RedisReply r0(r.releaseChild(0)); r0.checkReplyType(REDIS_REPLY_STRING); RedisReply r1(r.releaseChild(1)); r1.checkReplyType(REDIS_REPLY_ARRAY); pair<int64_t, vector<string>> ret; string cur = r0.getReply<string>(); try { ret.first = stoi(cur); } catch(logic_error& ex) { throw system_error(make_error_code(errc::io_error), "Invalid cursor string returned by scan: " + cur); } for (size_t i = 0; i < r1.getChildCount(); i++) { RedisReply r11(r1.releaseChild(i)); ret.second.emplace_back(r11.getReply<string>()); } return ret; } int64_t DBConnector::incr(const string &key) { RedisCommand sincr; sincr.format("INCR %s", key.c_str()); RedisReply r(this, sincr, REDIS_REPLY_INTEGER); return r.getContext()->integer; } int64_t DBConnector::decr(const string &key) { RedisCommand sdecr; sdecr.format("DECR %s", key.c_str()); RedisReply r(this, sdecr, REDIS_REPLY_INTEGER); return r.getContext()->integer; } shared_ptr<string> DBConnector::get(const string &key) { RedisCommand sget; sget.format("GET %s", key.c_str()); RedisReply r(this, sget); auto reply = r.getContext(); if (reply->type == REDIS_REPLY_NIL) { return shared_ptr<string>(NULL); } if (reply->type == REDIS_REPLY_STRING) { shared_ptr<string> ptr(new string(reply->str)); return ptr; } throw runtime_error("GET failed, memory exception"); } shared_ptr<string> DBConnector::hget(const string &key, const string &field) { RedisCommand shget; shget.format("HGET %s %s", key.c_str(), field.c_str()); RedisReply r(this, shget); auto reply = r.getContext(); if (reply->type == REDIS_REPLY_NIL) { return shared_ptr<string>(NULL); } if (reply->type == REDIS_REPLY_STRING) { shared_ptr<string> ptr(new string(reply->str)); return ptr; } SWSS_LOG_ERROR("HGET failed, reply-type: %d, %s: %s", reply->type, key.c_str(), field.c_str()); throw runtime_error("HGET failed, unexpected reply type, memory exception"); } bool DBConnector::hexists(const string &key, const string &field) { RedisCommand rexists; rexists.format("HEXISTS %s %s", key.c_str(), field.c_str()); RedisReply r(this, rexists, REDIS_REPLY_INTEGER); return r.getContext()->integer > 0; } int64_t DBConnector::rpush(const string &list, const string &item) { RedisCommand srpush; srpush.format("RPUSH %s %s", list.c_str(), item.c_str()); RedisReply r(this, srpush, REDIS_REPLY_INTEGER); return r.getContext()->integer; } shared_ptr<string> DBConnector::blpop(const string &list, int timeout) { RedisCommand sblpop; sblpop.format("BLPOP %s %d", list.c_str(), timeout); RedisReply r(this, sblpop); auto reply = r.getContext(); if (reply->type == REDIS_REPLY_NIL) { return shared_ptr<string>(NULL); } if (reply->type == REDIS_REPLY_STRING) { shared_ptr<string> ptr(new string(reply->str)); return ptr; } throw runtime_error("GET failed, memory exception"); } void DBConnector::subscribe(const std::string &pattern) { std::string s("SUBSCRIBE "); s += pattern; RedisReply r(this, s, REDIS_REPLY_ARRAY); } void DBConnector::psubscribe(const std::string &pattern) { std::string s("PSUBSCRIBE "); s += pattern; RedisReply r(this, s, REDIS_REPLY_ARRAY); } void DBConnector::punsubscribe(const std::string &pattern) { std::string s("PUNSUBSCRIBE "); s += pattern; RedisReply r(this, s, REDIS_REPLY_ARRAY); } int64_t DBConnector::publish(const string &channel, const string &message) { RedisCommand publish; publish.format("PUBLISH %s %s", channel.c_str(), message.c_str()); RedisReply r(this, publish, REDIS_REPLY_INTEGER); return r.getReply<long long int>(); } void DBConnector::hmset(const std::unordered_map<std::string, std::vector<std::pair<std::string, std::string>>>& multiHash) { SWSS_LOG_ENTER(); RedisPipeline pipe(this); for (auto& hash : multiHash) { RedisCommand hset; hset.formatHSET(hash.first, hash.second.begin(), hash.second.end()); pipe.push(hset, REDIS_REPLY_INTEGER); } pipe.flush(); } void DBConnector::del(const std::vector<std::string>& keys) { SWSS_LOG_ENTER(); RedisPipeline pipe(this); for (auto& key : keys) { RedisCommand del; del.formatDEL(key); pipe.push(del, REDIS_REPLY_INTEGER); } pipe.flush(); } map<string, map<string, map<string, string>>> DBConnector::getall() { const string separator = SonicDBConfig::getSeparator(this); auto const& keys = this->keys("*"); map<string, map<string, map<string, string>>> data; for (string key: keys) { size_t pos = key.find(separator); if (pos == string::npos) { continue; } string table_name = key.substr(0, pos); string row = key.substr(pos + 1); auto const& entry = this->hgetall<map<string, string>>(key); if (!entry.empty()) { data[table_name][row] = entry; } } return data; }