common/producertable.cpp (110 lines of code) (raw):

#include <stdlib.h> #include <tuple> #include "common/redisreply.h" #include "common/producertable.h" #include "common/json.h" #include <nlohmann/json.hpp> #include "common/logger.h" #include "common/redisapi.h" using namespace std; using json = nlohmann::json; namespace swss { ProducerTable::ProducerTable(DBConnector *db, const string &tableName) : ProducerTable(new RedisPipeline(db, 1), tableName, false) { m_pipeowned = true; } ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDbName())) , TableName_KeyValueOpQueues(tableName) , m_buffered(buffered) , m_pipeowned(false) , m_pipe(pipeline) { /* * KEYS[1] : tableName + "_KEY_VALUE_OP_QUEUE * ARGV[1] : key * ARGV[2] : value * ARGV[3] : op * KEYS[2] : tableName + "_CHANNEL" * ARGV[4] : "G" */ string luaEnque = "redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);" "redis.call('PUBLISH', KEYS[2], ARGV[4]);"; m_shaEnque = m_pipe->loadRedisScript(luaEnque); } ProducerTable::ProducerTable(DBConnector *db, const string &tableName, const string &dumpFile) : ProducerTable(db, tableName) { m_dumpFile.open(dumpFile, fstream::out | fstream::trunc); m_dumpFile << "[" << endl; } ProducerTable::~ProducerTable() { if (m_dumpFile.is_open()) { m_dumpFile << endl << "]" << endl; m_dumpFile.close(); } if (m_pipeowned) { delete m_pipe; } } void ProducerTable::setBuffered(bool buffered) { m_buffered = buffered; } void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */) { RedisCommand command; command.format( "EVALSHA %s 2 %s %s %s %s %s %s", m_shaEnque.c_str(), getKeyValueOpQueueTableName().c_str(), getChannelName(m_pipe->getDbId()).c_str(), key.c_str(), value.c_str(), op.c_str(), "G"); m_pipe->push(command, REDIS_REPLY_NIL); } void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix) { if (m_dumpFile.is_open()) { if (!m_firstItem) m_dumpFile << "," << endl; else m_firstItem = false; json j; string json_key = getKeyName(key); j[json_key] = json::object(); for (const auto &it : values) j[json_key][fvField(it)] = fvValue(it); j["OP"] = op; m_dumpFile << j.dump(4); } enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix); // Only buffer "set", "bulkset" or "create" operations if (!m_buffered || (op != "create" && op != "set" && op != "bulkset" )) { m_pipe->flush(); } } void ProducerTable::del(const string &key, const string &op, const string &prefix) { if (m_dumpFile.is_open()) { if (!m_firstItem) m_dumpFile << "," << endl; else m_firstItem = false; json j; string json_key = getKeyName(key); j[json_key] = json::object(); j["OP"] = op; m_dumpFile << j.dump(4); } enqueueDbChange(key, "{}", "D" + op, prefix); if (!m_buffered) { m_pipe->flush(); } } void ProducerTable::flush() { m_pipe->flush(); } }