common/configdb.cpp (346 lines of code) (raw):
#include <boost/algorithm/string.hpp>
#include <map>
#include <vector>
#include "configdb.h"
#include "pubsub.h"
#include "converter.h"
using namespace std;
using namespace swss;
ConfigDBConnector_Native::ConfigDBConnector_Native(bool use_unix_socket_path, const char *netns)
: SonicV2Connector_Native(use_unix_socket_path, netns)
, m_table_name_separator("|")
, m_key_separator("|")
{
}
void ConfigDBConnector_Native::db_connect(string db_name, bool wait_for_init, bool retry_on)
{
m_db_name = db_name;
m_key_separator = m_table_name_separator = get_db_separator(db_name);
SonicV2Connector_Native::connect(m_db_name, retry_on);
if (wait_for_init)
{
auto& client = get_redis_client(m_db_name);
auto pubsub = make_shared<PubSub>(&client);
auto initialized = client.get(INIT_INDICATOR);
if (!initialized || initialized->empty())
{
string pattern = "__keyspace@" + to_string(get_dbid(m_db_name)) + "__:" + INIT_INDICATOR;
pubsub->psubscribe(pattern);
for (;;)
{
auto item = pubsub->listen_message();
if (item["type"] == "pmessage")
{
string channel = item["channel"];
size_t pos = channel.find(':');
string key;
if (pos != string::npos)
{
key = channel.substr(pos + 1);
}
if (key == INIT_INDICATOR)
{
initialized = client.get(INIT_INDICATOR);
if (initialized && !initialized->empty())
{
break;
}
}
}
}
pubsub->punsubscribe(pattern);
}
}
}
void ConfigDBConnector_Native::connect(bool wait_for_init, bool retry_on)
{
db_connect("CONFIG_DB", wait_for_init, retry_on);
}
// Write a table entry to config db.
// Remove extra fields in the db which are not in the data.
// Args:
// table: Table name.
// key: Key of table entry, or a tuple of keys if it is a multi-key table.
// data: Table row data in a form of dictionary {'column_key': 'value', ...}.
// Pass {} as data will delete the entry.
void ConfigDBConnector_Native::set_entry(string table, string key, const map<string, string>& data)
{
auto& client = get_redis_client(m_db_name);
string _hash = to_upper(table) + m_table_name_separator + key;
if (data.empty())
{
client.del(_hash);
}
else
{
auto original = get_entry(table, key);
client.hmset(_hash, data.begin(), data.end());
for (auto& it: original)
{
auto& k = it.first;
bool found = data.find(k) != data.end();
if (!found)
{
client.hdel(_hash, k);
}
}
}
}
// Modify a table entry to config db.
// Args:
// table: Table name.
// key: Key of table entry, or a tuple of keys if it is a multi-key table.
// data: Table row data in a form of dictionary {'column_key': 'value', ...}.
// Pass {} as data will create an entry with no column if not already existed.
// Pass None as data will delete the entry.
void ConfigDBConnector_Native::mod_entry(string table, string key, const map<string, string>& data)
{
auto& client = get_redis_client(m_db_name);
string _hash = to_upper(table) + m_table_name_separator + key;
if (data.empty())
{
client.del(_hash);
}
else
{
client.hmset(_hash, data.begin(), data.end());
}
}
// Read a table entry from config db.
// Args:
// table: Table name.
// key: Key of table entry, or a tuple of keys if it is a multi-key table.
// Returns:
// Table row data in a form of dictionary {'column_key': 'value', ...}
// Empty dictionary if table does not exist or entry does not exist.
map<string, string> ConfigDBConnector_Native::get_entry(string table, string key)
{
auto& client = get_redis_client(m_db_name);
string _hash = to_upper(table) + m_table_name_separator + key;
return client.hgetall<map<string, string>>(_hash);
}
// Read all keys of a table from config db.
// Args:
// table: Table name.
// split: split the first part and return second.
// Useful for keys with two parts <tablename>:<key>
// Returns:
// List of keys.
vector<string> ConfigDBConnector_Native::get_keys(string table, bool split)
{
auto& client = get_redis_client(m_db_name);
string pattern = to_upper(table) + m_table_name_separator + "*";
const auto& keys = client.keys(pattern);
vector<string> data;
for (auto& key: keys)
{
if (split)
{
size_t pos = key.find(m_table_name_separator);
string row;
if (pos != string::npos)
{
row = key.substr(pos + 1);
}
data.push_back(row);
}
else
{
data.push_back(key);
}
}
return data;
}
// Read an entire table from config db.
// Args:
// table: Table name.
// Returns:
// Table data in a dictionary form of
// { 'row_key': {'column_key': value, ...}, ...}
// or { ('l1_key', 'l2_key', ...): {'column_key': value, ...}, ...} for a multi-key table.
// Empty dictionary if table does not exist.
map<string, map<string, string>> ConfigDBConnector_Native::get_table(string table)
{
auto& client = get_redis_client(m_db_name);
string pattern = to_upper(table) + m_table_name_separator + "*";
const auto& keys = client.keys(pattern);
map<string, map<string, string>> data;
for (auto& key: keys)
{
auto const& entry = client.hgetall<map<string, string>>(key);
size_t pos = key.find(m_table_name_separator);
string row;
if (pos == string::npos)
{
continue;
}
row = key.substr(pos + 1);
data[row] = entry;
}
return data;
}
// Delete an entire table from config db.
// Args:
// table: Table name.
void ConfigDBConnector_Native::delete_table(string table)
{
auto& client = get_redis_client(m_db_name);
string pattern = to_upper(table) + m_table_name_separator + "*";
const auto& keys = client.keys(pattern);
for (auto& key: keys)
{
client.del(key);
}
}
// Write multiple tables into config db.
// Extra entries/fields in the db which are not in the data are kept.
// Args:
// data: config data in a dictionary form
// {
// 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
// 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
// ...
// }
void ConfigDBConnector_Native::mod_config(const map<string, map<string, map<string, string>>>& data)
{
for (auto const& it: data)
{
string table_name = it.first;
auto const& table_data = it.second;
if (table_data.empty())
{
delete_table(table_name);
continue;
}
for (auto const& ie: table_data)
{
string key = ie.first;
auto const& fvs = ie.second;
mod_entry(table_name, key, fvs);
}
}
}
// Read all config data.
// Returns:
// Config data in a dictionary form of
// {
// 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
// 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
// ...
// }
map<string, map<string, map<string, string>>> ConfigDBConnector_Native::get_config()
{
auto& client = get_redis_client(m_db_name);
return client.getall();
}
std::string ConfigDBConnector_Native::getKeySeparator() const
{
return m_key_separator;
}
std::string ConfigDBConnector_Native::getTableNameSeparator() const
{
return m_table_name_separator;
}
std::string ConfigDBConnector_Native::getDbName() const
{
return m_db_name;
}
ConfigDBPipeConnector_Native::ConfigDBPipeConnector_Native(bool use_unix_socket_path, const char *netns)
: ConfigDBConnector_Native(use_unix_socket_path, netns)
{
}
// Helper method to delete table entries from config db using Redis pipeline
// with batch size of REDIS_SCAN_BATCH_SIZE.
// The caller should call pipeline execute once ready
// Args:
// client: Redis client
// pipe: Redis DB pipe
// pattern: key pattern
// cursor: position to start scanning from
//
// Returns:
// cur: poition of next item to scan
int ConfigDBPipeConnector_Native::_delete_entries(DBConnector& client, RedisTransactioner& pipe, const char *pattern, int cursor)
{
const auto& rc = client.scan(cursor, pattern, REDIS_SCAN_BATCH_SIZE);
auto cur = rc.first;
auto& keys = rc.second;
for (auto const& key: keys)
{
RedisCommand sdel;
sdel.format("DEL %s", key.c_str());
pipe.enqueue(sdel, REDIS_REPLY_INTEGER);
}
return cur;
}
// Helper method to delete table entries from config db using Redis pipeline.
// The caller should call pipeline execute once ready
// Args:
// client: Redis client
// pipe: Redis DB pipe
// table: Table name.
void ConfigDBPipeConnector_Native::_delete_table(DBConnector& client, RedisTransactioner& pipe, string table)
{
string pattern = to_upper(table) + m_table_name_separator + "*";
auto cur = _delete_entries(client, pipe, pattern.c_str(), 0);
while (cur != 0)
{
cur = _delete_entries(client, pipe, pattern.c_str(), cur);
}
}
// Write a table entry to config db
// Remove extra fields in the db which are not in the data
// Args:
// table: Table name
// key: Key of table entry, or a tuple of keys if it is a multi-key table
// data: Table row data in a form of dictionary {'column_key': 'value', ...}
// Pass {} as data will delete the entry
void ConfigDBPipeConnector_Native::set_entry(string table, string key, const map<string, string>& data)
{
auto& client = get_redis_client(m_db_name);
DBConnector clientPipe(client);
RedisTransactioner pipe(&clientPipe);
pipe.multi();
_set_entry(pipe, table, key, data);
pipe.exec();
}
// Write a table entry to config db
// Remove extra fields in the db which are not in the data
// Args:
// pipe: Redis DB pipe
// table: Table name
// key: Key of table entry, or a tuple of keys if it is a multi-key table
// data: Table row data in a form of dictionary {'column_key': 'value', ...}
// Pass {} as data will delete the entry
void ConfigDBPipeConnector_Native::_set_entry(RedisTransactioner& pipe, std::string table, std::string key, const std::map<std::string, std::string>& data)
{
string _hash = to_upper(table) + m_table_name_separator + key;
if (data.empty())
{
RedisCommand sdel;
sdel.format("DEL %s", _hash.c_str());
pipe.enqueue(sdel, REDIS_REPLY_INTEGER);
}
else
{
auto original = get_entry(table, key);
RedisCommand shset;
shset.formatHSET(_hash, data.begin(), data.end());
pipe.enqueue(shset, REDIS_REPLY_INTEGER);
for (auto& it: original)
{
auto& k = it.first;
bool found = data.find(k) != data.end();
if (!found)
{
RedisCommand shdel;
shdel.formatHDEL(_hash, k);
pipe.enqueue(shdel, REDIS_REPLY_INTEGER);
}
}
}
}
// Modify a table entry to config db.
// Args:
// table: Table name.
// pipe: Redis DB pipe
// table: Table name.
// key: Key of table entry, or a tuple of keys if it is a multi-key table.
// data: Table row data in a form of dictionary {'column_key': 'value', ...}.
// Pass {} as data will create an entry with no column if not already existed.
// Pass None as data will delete the entry.
void ConfigDBPipeConnector_Native::_mod_entry(RedisTransactioner& pipe, string table, string key, const map<string, string>& data)
{
string _hash = to_upper(table) + m_table_name_separator + key;
if (data.empty())
{
RedisCommand sdel;
sdel.format("DEL %s", _hash.c_str());
pipe.enqueue(sdel, REDIS_REPLY_INTEGER);
}
else
{
RedisCommand shset;
shset.formatHSET(_hash, data.begin(), data.end());
pipe.enqueue(shset, REDIS_REPLY_INTEGER);
}
}
// Write multiple tables into config db.
// Extra entries/fields in the db which are not in the data are kept.
// Args:
// data: config data in a dictionary form
// {
// 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
// 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
// ...
// }
void ConfigDBPipeConnector_Native::mod_config(const map<string, map<string, map<string, string>>>& data)
{
auto& client = get_redis_client(m_db_name);
DBConnector clientPipe(client);
RedisTransactioner pipe(&clientPipe);
pipe.multi();
for (auto const& id: data)
{
auto& table_name = id.first;
auto& table_data = id.second;
if (table_data.empty())
{
_delete_table(client, pipe, table_name);
continue;
}
for (auto const& it: table_data)
{
auto& key = it.first;
_mod_entry(pipe, table_name, key, it.second);
}
}
pipe.exec();
}
// Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
// Args:
// client: Redis client
// pipe: Redis DB pipe
// data: config dictionary
// cursor: position to start scanning from
//
// Returns:
// cur: position of next item to scan
int ConfigDBPipeConnector_Native::_get_config(DBConnector& client, RedisTransactioner& pipe, map<string, map<string, map<string, string>>>& data, int cursor)
{
auto const& rc = client.scan(cursor, "*", REDIS_SCAN_BATCH_SIZE);
auto cur = rc.first;
auto const& keys = rc.second;
pipe.multi();
for (auto const& key: keys)
{
size_t pos = key.find(m_table_name_separator);
if (pos == string::npos)
{
continue;
}
RedisCommand shgetall;
shgetall.format("HGETALL %s", key.c_str());
pipe.enqueue(shgetall, REDIS_REPLY_ARRAY);
}
pipe.exec();
for (auto const& key: keys)
{
size_t pos = key.find(m_table_name_separator);
string table_name = key.substr(0, pos);
string row;
if (pos == string::npos)
{
continue;
}
row = key.substr(pos + 1);
auto reply = pipe.dequeueReply();
RedisReply r(reply);
auto& dataentry = data[table_name][row];
for (unsigned int i = 0; i < r.getChildCount(); i += 2)
{
string field = r.getChild(i)->str;
string value = r.getChild(i+1)->str;
dataentry.emplace(field, value);
}
}
return cur;
}
map<string, map<string, map<string, string>>> ConfigDBPipeConnector_Native::get_config()
{
auto& client = get_redis_client(m_db_name);
DBConnector clientPipe(client);
RedisTransactioner pipe(&clientPipe);
map<string, map<string, map<string, string>>> data;
auto cur = _get_config(client, pipe, data, 0);
while (cur != 0)
{
cur = _get_config(client, pipe, data, cur);
}
return data;
}