common/configdb.h (298 lines of code) (raw):
#pragma once
#include <string>
#include <map>
#include "sonicv2connector.h"
#include "redistran.h"
namespace swss {
class ConfigDBConnector_Native : public SonicV2Connector_Native
{
public:
static constexpr const char *INIT_INDICATOR = "CONFIG_DB_INITIALIZED";
ConfigDBConnector_Native(bool use_unix_socket_path = false, const char *netns = "");
void db_connect(std::string db_name, bool wait_for_init = false, bool retry_on = false);
void connect(bool wait_for_init = true, bool retry_on = false);
virtual void set_entry(std::string table, std::string key, const std::map<std::string, std::string>& data);
virtual void mod_entry(std::string table, std::string key, const std::map<std::string, std::string>& data);
std::map<std::string, std::string> get_entry(std::string table, std::string key);
std::vector<std::string> get_keys(std::string table, bool split = true);
std::map<std::string, std::map<std::string, std::string>> get_table(std::string table);
void delete_table(std::string table);
virtual void mod_config(const std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data);
virtual std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> get_config();
std::string getKeySeparator() const;
std::string getTableNameSeparator() const;
std::string getDbName() const;
protected:
std::string m_table_name_separator = "|";
std::string m_key_separator = "|";
std::string m_db_name;
};
#if defined(SWIG) && defined(SWIGGO)
%insert(go_wrapper) %{
type ConfigDBConnector struct {
ConfigDBConnector_Native
}
func NewConfigDBConnector(a ...interface{}) *ConfigDBConnector {
return &ConfigDBConnector{
NewConfigDBConnector_Native(a...),
}
}
%}
#endif
#if defined(SWIG) && defined(SWIGPYTHON)
%pythoncode %{
## Note: diamond inheritance, reusing functions in both classes
class ConfigDBConnector(SonicV2Connector, ConfigDBConnector_Native):
## Note: there is no easy way for SWIG to map ctor parameter netns(C++) to namespace(python)
def __init__(self, use_unix_socket_path = False, namespace = '', **kwargs):
if 'decode_responses' in kwargs and kwargs.pop('decode_responses') != True:
raise ValueError('decode_responses must be True if specified, False is not supported')
if namespace is None:
namespace = ''
super(ConfigDBConnector, self).__init__(use_unix_socket_path = use_unix_socket_path, namespace = namespace)
# Trick: to achieve static/instance method "overload", we must use initize the function in ctor
# ref: https://stackoverflow.com/a/28766809/2514803
self.serialize_key = self._serialize_key
self.deserialize_key = self._deserialize_key
## Note: callback is difficult to implement by SWIG C++, so keep in python
self.handlers = {}
self.fire_init_data = {}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
pass
@property
def KEY_SEPARATOR(self):
return self.getKeySeparator()
@property
def TABLE_NAME_SEPARATOR(self):
return self.getTableNameSeparator()
@property
def db_name(self):
return self.getDbName()
## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self, init_data_handler=None):
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))
# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}
# Function to send initial data as series of updates through individual table callback handlers
def load_data(tbl, data):
if self.fire_init_data[tbl]:
for row, x in data.items():
self.__fire(tbl, row, x)
return False
return True
init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}
# Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller
if init_data_handler:
init_data_handler(init_callback_data)
while True:
item = self.pubsub.listen_message(interrupt_on_signal=True)
if 'type' not in item:
# When timeout or interrupted, item will not contains 'type'
continue
if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if table in self.handlers:
if item['data'] == 'del':
data = None
else:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
if not init_data[table]:
del init_data[table]
if cache_hit: continue
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries
## Dynamic typed functions used in python
@staticmethod
def raw_to_typed(raw_data):
if raw_data is None:
return None
typed_data = {}
for raw_key in raw_data:
key = raw_key
# "NULL:NULL" is used as a placeholder for objects with no attributes
if key == "NULL":
pass
# A column key with ending '@' is used to mark list-typed table items
# TODO: Replace this with a schema-based typing mechanism.
elif key.endswith("@"):
value = raw_data[raw_key].split(',')
typed_data[key[:-1]] = value
else:
typed_data[key] = raw_data[raw_key]
return typed_data
@staticmethod
def typed_to_raw(typed_data):
if typed_data is None:
return {}
elif len(typed_data) == 0:
return { "NULL": "NULL" }
raw_data = {}
for key in typed_data:
value = typed_data[key]
if type(value) is list:
raw_data[key+'@'] = ','.join(value)
else:
raw_data[key] = str(value)
return raw_data
# Note: we could not use a class variable for KEY_SEPARATOR, but original dependent code is using
# these static functions. So we implement both static and instance functions with the same name.
# The static function will behave according to ConfigDB separators.
@staticmethod
def serialize_key(key, separator='|'):
if type(key) is tuple:
return separator.join(key)
else:
return str(key)
def _serialize_key(self, key):
return ConfigDBConnector.serialize_key(key, self.KEY_SEPARATOR)
@staticmethod
def deserialize_key(key, separator='|'):
tokens = key.split(separator)
if len(tokens) > 1:
return tuple(tokens)
else:
return key
def _deserialize_key(self, key):
return ConfigDBConnector.deserialize_key(key, self.KEY_SEPARATOR)
def __fire(self, table, key, data):
if table in self.handlers:
handler = self.handlers[table]
handler(table, key, data)
def subscribe(self, table, handler, fire_init_data=False):
self.handlers[table] = handler
self.fire_init_data[table] = fire_init_data
def unsubscribe(self, table):
if table in self.handlers:
self.handlers.pop(table)
def set_entry(self, table, key, data):
key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
super(ConfigDBConnector, self).set_entry(table, key, raw_data)
def mod_config(self, data):
raw_config = {}
for table_name, table_data in data.items():
if table_data == {}:
# When table data is {}, no action.
continue
raw_config[table_name] = {}
if table_data == None:
# When table data is 'None', delete the table.
continue
for key, data in table_data.items():
raw_key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
raw_config[table_name][raw_key] = raw_data
super(ConfigDBConnector, self).mod_config(raw_config)
def mod_entry(self, table, key, data):
key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
super(ConfigDBConnector, self).mod_entry(table, key, raw_data)
def get_entry(self, table, key):
key = self.serialize_key(key)
raw_data = super(ConfigDBConnector, self).get_entry(table, key)
return self.raw_to_typed(raw_data)
def get_keys(self, table, split=True):
keys = super(ConfigDBConnector, self).get_keys(table, split)
ret = []
for key in keys:
ret.append(self.deserialize_key(key))
return ret
def get_table(self, table):
data = super(ConfigDBConnector, self).get_table(table)
ret = {}
for row, entry in data.items():
entry = self.raw_to_typed(entry)
ret[self.deserialize_key(row)] = entry
return ret
def get_config(self):
data = super(ConfigDBConnector, self).get_config()
ret = {}
for table_name, table in data.items():
for row, entry in table.items():
entry = self.raw_to_typed(entry)
ret.setdefault(table_name, {})[self.deserialize_key(row)] = entry
return ret
%}
#endif
#if defined(SWIG) && defined(SWIGPYTHON) && defined(ENABLE_YANG_MODULES)
%pythoncode %{
class YangDefaultDecorator(object):
def __init__(self, config_db_connector):
self.connector = config_db_connector
self.default_value_provider = DefaultValueProvider()
# helper methods for append default values to result.
def _append_default_value(self, table, key, data):
if data is None or len(data) == 0:
# empty entry means the entry been deleted
return data
serialized_key = self.connector.serialize_key(key)
defaultValues = self.default_value_provider.getDefaultValues(table, serialized_key)
for field in defaultValues:
if field not in data:
data[field] = defaultValues[field]
# override read APIs
def new_get_entry(self, table, key):
result = self.connector.get_entry(table, key)
self._append_default_value(table, key, result)
return result
def new_get_table(self, table):
result = self.connector.get_table(table)
for key in result:
self._append_default_value(table, key, result[key])
return result
def new_get_config(self):
result = self.connector.get_config()
for table in result:
for key in result[table]:
# Can not pass result[table][key] as parameter here, because python will create a copy. re-assign entry to result to bypass this issue.
entry = result[table][key]
self._append_default_value(table, key, entry)
result[table][key] = entry
return result
def __getattr__(self, name):
if name == "get_entry":
return self.new_get_entry
elif name == "get_table":
return self.new_get_table
elif name == "get_config":
return self.new_get_config
originalMethod = self.connector.__getattribute__(name)
return originalMethod
%}
#endif
class ConfigDBPipeConnector_Native: public ConfigDBConnector_Native
{
public:
ConfigDBPipeConnector_Native(bool use_unix_socket_path = false, const char *netns = "");
void set_entry(std::string table, std::string key, const std::map<std::string, std::string>& data) override;
void mod_config(const std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data) override;
std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> get_config() override;
private:
static const int64_t REDIS_SCAN_BATCH_SIZE = 30;
int _delete_entries(DBConnector& client, RedisTransactioner& pipe, const char *pattern, int cursor);
void _delete_table(DBConnector& client, RedisTransactioner& pipe, std::string table);
void _set_entry(RedisTransactioner& pipe, std::string table, std::string key, const std::map<std::string, std::string>& data);
void _mod_entry(RedisTransactioner& pipe, std::string table, std::string key, const std::map<std::string, std::string>& data);
int _get_config(DBConnector& client, RedisTransactioner& pipe, std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data, int cursor);
};
#if defined(SWIG) && defined(SWIGPYTHON)
%pythoncode %{
class ConfigDBPipeConnector(ConfigDBConnector, ConfigDBPipeConnector_Native):
## Note: diamond inheritance, reusing functions in both classes
def __init__(self, **kwargs):
super(ConfigDBPipeConnector, self).__init__(**kwargs)
%}
#endif
}