common/zmqproducerstatetable.cpp (153 lines of code) (raw):

#include <stdlib.h> #include <tuple> #include <sstream> #include <utility> #include <algorithm> #include <chrono> #include <cmath> #include <zmq.h> #include "redisreply.h" #include "table.h" #include "redisapi.h" #include "redispipeline.h" #include "zmqproducerstatetable.h" #include "zmqconsumerstatetable.h" #include "binaryserializer.h" using namespace std; namespace swss { ZmqProducerStateTable::ZmqProducerStateTable(DBConnector *db, const string &tableName, ZmqClient &zmqClient, bool dbPersistence) : ProducerStateTable(db, tableName), m_zmqClient(zmqClient), m_dbName(db->getDbName()), m_tableNameStr(tableName) { initialize(db, tableName, dbPersistence); } ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const string &tableName, ZmqClient &zmqClient, bool buffered, bool dbPersistence) : ProducerStateTable(pipeline, tableName, buffered), m_zmqClient(zmqClient), m_dbName(pipeline->getDbName()), m_tableNameStr(tableName) { initialize(pipeline->getDBConnector(), tableName, dbPersistence); } void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence) { if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); m_asyncDBUpdater = std::make_unique<AsyncDBUpdater>(db, tableName); } else { SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str()); m_asyncDBUpdater = nullptr; } } void ZmqProducerStateTable::set( const string &key, const vector<FieldValueTuple> &values, const string &op /*= SET_COMMAND*/, const string &prefix) { std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{ KeyOpFieldsValuesTuple{key, op, values} }; m_zmqClient.sendMsg( m_dbName, m_tableNameStr, kcos); if (m_asyncDBUpdater != nullptr) { // async write need keep data till write to DB std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(); kfvKey(*clone) = key; kfvOp(*clone) = op; for(const auto &value : values) { kfvFieldsValues(*clone).push_back(value); } m_asyncDBUpdater->update(clone); } } void ZmqProducerStateTable::del( const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix) { std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{ KeyOpFieldsValuesTuple{key, op, std::vector<FieldValueTuple>{}} }; m_zmqClient.sendMsg( m_dbName, m_tableNameStr, kcos); if (m_asyncDBUpdater != nullptr) { // async write need keep data till write to DB std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(); kfvKey(*clone) = key; kfvOp(*clone) = op; m_asyncDBUpdater->update(clone); } } void ZmqProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple> &values) { m_zmqClient.sendMsg( m_dbName, m_tableNameStr, values); if (m_asyncDBUpdater != nullptr) { for (const auto &value : values) { // async write need keep data till write to DB std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(value); m_asyncDBUpdater->update(clone); } } } void ZmqProducerStateTable::del(const std::vector<std::string> &keys) { std::vector<KeyOpFieldsValuesTuple> kcos; for (const auto &key : keys) { kcos.push_back(KeyOpFieldsValuesTuple{key, DEL_COMMAND, std::vector<FieldValueTuple>{}}); } m_zmqClient.sendMsg( m_dbName, m_tableNameStr, kcos); if (m_asyncDBUpdater != nullptr) { for (const auto &key : keys) { // async write need keep data till write to DB std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(); kfvKey(*clone) = key; kfvOp(*clone) = DEL_COMMAND; m_asyncDBUpdater->update(clone); } } } void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos) { m_zmqClient.sendMsg( m_dbName, m_tableNameStr, kcos); if (m_asyncDBUpdater != nullptr) { for (const auto &value : kcos) { // async write need keep data till write to DB std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(value); m_asyncDBUpdater->update(clone); } } } size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) { throw system_error(make_error_code(errc::operation_not_supported), "Database persistence is not enabled"); } return m_asyncDBUpdater->queueSize(); } }