common/producerstatetable.cpp (391 lines of code) (raw):
#include <stdlib.h>
#include <tuple>
#include <sstream>
#include <utility>
#include <algorithm>
#include "redisreply.h"
#include "table.h"
#include "redisapi.h"
#include "redispipeline.h"
#include "producerstatetable.h"
using namespace std;
namespace swss {
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false)
{
m_pipeowned = true;
}
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: ProducerStateTable(pipeline, tableName, buffered, false) {}
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
, TableName_KeySet(tableName)
, m_flushPub(flushPub)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
reloadRedisScript();
string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);
string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}
ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
{
delete m_pipe;
}
}
void ProducerStateTable::reloadRedisScript()
{
// Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush
// However, if m_buffered is false, follow the original one publish per lua design
// Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered
/* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */
if (m_buffered && m_flushPub)
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));
/* 2. Setup lua strings: determine whether to attach luaPub after each lua string */
// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n";
string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n";
string luaBatchedSet =
"local added = 0\n"
"local idx = 2\n"
"for i = 0, #KEYS - 4 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[4 + i])\n"
" for j = 0, tonumber(ARGV[idx]) - 1 do\n"
" local attr = ARGV[idx + j * 2 + 1]\n"
" local val = ARGV[idx + j * 2 + 2]\n"
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n";
string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n";
if (!m_flushPub || !m_buffered)
{
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}
/* 3. load redis script based on the lua string */
m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}
void ProducerStateTable::setBuffered(bool buffered)
{
m_buffered = buffered;
reloadRedisScript();
}
void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
const string &op /*= SET_COMMAND*/, const string &prefix)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto& iv: values)
{
m_tempViewState[key][fvField(iv)] = fvValue(iv);
}
return;
}
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaSet);
args.emplace_back(to_string(values.size() + 2));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.insert(args.end(), values.size(), getStateHashPrefix() + getKeyName(key));
args.emplace_back("G");
args.emplace_back(key);
for (const auto& iv: values)
{
args.emplace_back(fvField(iv));
args.emplace_back(fvValue(iv));
}
// Invoke redis command
RedisCommand command;
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}
void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
m_tempViewState.erase(key);
return;
}
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaDel);
args.emplace_back("4");
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getStateHashPrefix() + getKeyName(key));
args.emplace_back(getDelKeySetName());
args.emplace_back("G");
args.emplace_back(key);
args.emplace_back("''");
args.emplace_back("''");
// Invoke redis command
RedisCommand command;
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}
void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &value : values)
{
const std::string &key = kfvKey(value);
for (const auto &iv : kfvFieldsValues(value))
{
m_tempViewState[key][fvField(iv)] = fvValue(iv);
}
}
return;
}
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedSet);
args.emplace_back(to_string(values.size() + 3));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &value : values)
{
args.emplace_back(kfvKey(value));
}
args.emplace_back("G");
for (const auto &value : values)
{
args.emplace_back(to_string(kfvFieldsValues(value).size()));
for (const auto &iv : kfvFieldsValues(value))
{
args.emplace_back(fvField(iv));
args.emplace_back(fvValue(iv));
}
}
// Invoke redis command
RedisCommand command;
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}
void ProducerStateTable::del(const std::vector<std::string>& keys)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &key : keys)
{
m_tempViewState.erase(key);
}
return;
}
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedDel);
args.emplace_back(to_string(keys.size() + 4));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getDelKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &key : keys)
{
args.emplace_back(key);
}
args.emplace_back("G");
// Invoke redis command
RedisCommand command;
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}
void ProducerStateTable::flush()
{
m_pipe->flush();
}
int64_t ProducerStateTable::count()
{
RedisCommand cmd;
cmd.format("SCARD %s", getKeySetName().c_str());
RedisReply r = m_pipe->push(cmd);
r.checkReplyType(REDIS_REPLY_INTEGER);
return r.getReply<long long int>();
}
// Warning: calling this function will cause all data in keyset and the temporary table to be abandoned.
// ConsumerState may have got the notification from PUBLISH, but will see no data popped.
void ProducerStateTable::clear()
{
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaClear);
args.emplace_back("3");
args.emplace_back(getKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName());
args.emplace_back(getDelKeySetName());
// Invoke redis command
RedisCommand cmd;
cmd.format(args);
m_pipe->push(cmd, REDIS_REPLY_NIL);
m_pipe->flush();
}
void ProducerStateTable::create_temp_view()
{
if (m_tempViewActive)
{
SWSS_LOG_WARN("create_temp_view() called for table %s when another temp view is under work, %zd objects in existing temp view will be discarded.", getTableName().c_str(), m_tempViewState.size());
}
m_tempViewActive = true;
m_tempViewState.clear();
}
void ProducerStateTable::apply_temp_view()
{
if (!m_tempViewActive)
{
SWSS_LOG_THROW("apply_temp_view() called for table %s, however no temp view was created.", getTableName().c_str());
}
// Drop all pending operation first
clear();
TableDump currentState;
{
Table mainTable(m_pipe, getTableName(), false);
mainTable.dump(currentState);
}
// Print content of current view and temp view as debug log
SWSS_LOG_INFO("View switch of table %s required.", getTableName().c_str());
SWSS_LOG_INFO("Objects in current view:");
for (auto const & kfvPair : currentState)
{
SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size());
}
SWSS_LOG_INFO("Objects in target view:");
for (auto const & kfvPair : m_tempViewState)
{
SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size());
}
std::vector<std::string> keysToSet;
std::vector<std::string> keysToDel;
// Compare based on existing objects.
// Please note that this comparation is literal not contextual -
// e.g. {nexthop: 10.1.1.1, 10.1.1.2} and {nexthop: 10.1.1.2, 10.1.1.1} will be treated as different.
// Application will need to handle it, to make sure contextually identical field values also literally identical.
for (auto const & kfvPair : currentState)
{
const string& key = kfvPair.first;
const TableMap& fieldValueMap = kfvPair.second;
// DEL is needed if object does not exist in new state, or any field is not presented in new state
// SET is almost always needed, unless old state and new state exactly match each other
// (All old fields exists in new state, values match, and there is no additional field in new state)
if (m_tempViewState.find(key) == m_tempViewState.end()) // Key does not exist in new view
{
keysToDel.emplace_back(key);
keysToSet.emplace_back(key);
continue;
}
const TableMap& newFieldValueMap = m_tempViewState[key];
bool needDel = false;
bool needSet = false;
for (auto const& fvPair : fieldValueMap)
{
const string& field = fvPair.first;
const string& value = fvPair.second;
if (newFieldValueMap.find(field) == newFieldValueMap.end()) // Field does not exist in new view
{
needDel = true;
needSet = true;
break;
}
if (newFieldValueMap.at(field) != value) // Field value changed
{
needSet = true;
}
}
if (newFieldValueMap.size() > fieldValueMap.size()) // New field added
{
needSet = true;
}
if (needDel)
{
keysToDel.emplace_back(key);
}
if (needSet)
{
keysToSet.emplace_back(key);
}
else // If exactly match, no need to sync new state to StateHash in DB
{
m_tempViewState.erase(key);
}
}
// Objects that do not exist currently need to be created
for (auto const & kfvPair : m_tempViewState)
{
const string& key = kfvPair.first;
if (currentState.find(key) == currentState.end())
{
keysToSet.emplace_back(key);
}
}
// Assembly redis command args into a string vector
// See comment in producer_state_table_apply_view.lua for argument format
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaApplyView);
args.emplace_back(to_string(m_tempViewState.size() + 3));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getDelKeySetName());
vector<string> argvs;
argvs.emplace_back("G");
argvs.emplace_back(to_string(keysToSet.size()));
argvs.insert(argvs.end(), keysToSet.begin(), keysToSet.end());
argvs.emplace_back(to_string(keysToDel.size()));
argvs.insert(argvs.end(), keysToDel.begin(), keysToDel.end());
for (auto const & kfvPair : m_tempViewState)
{
const string& key = kfvPair.first;
const TableMap& fieldValueMap = kfvPair.second;
args.emplace_back(getStateHashPrefix() + getKeyName(key));
argvs.emplace_back(to_string(fieldValueMap.size()));
for (auto const& fvPair : fieldValueMap)
{
const string& field = fvPair.first;
const string& value = fvPair.second;
argvs.emplace_back(field);
argvs.emplace_back(value);
}
}
args.insert(args.end(), argvs.begin(), argvs.end());
// Log arguments for debug
{
std::stringstream ss;
for (auto const & item : args)
{
ss << item << " ";
}
SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str());
}
// Invoke redis command
RedisCommand command;
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
m_pipe->flush();
// Clear state, temp view operation is now finished
m_tempViewState.clear();
m_tempViewActive = false;
}
}