common/consumerstatetable.cpp (81 lines of code) (raw):
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include "dbconnector.h"
#include "table.h"
#include "selectable.h"
#include "redisselect.h"
#include "redisapi.h"
#include "consumerstatetable.h"
namespace swss {
ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeySet(tableName)
{
std::string luaScript = loadLuaScript("consumer_state_table_pops.lua");
m_shaPop = loadRedisScript(db, luaScript);
for (;;)
{
RedisReply watch(m_db, "WATCH " + getKeySetName(), REDIS_REPLY_STATUS);
watch.checkStatusOK();
multi();
enqueue(std::string("SCARD ") + getKeySetName(), REDIS_REPLY_INTEGER);
subscribe(m_db, getChannelName(m_db->getDbId()));
bool succ = exec();
if (succ) break;
}
RedisReply r(dequeueReply());
setQueueLength(r.getReply<long long int>());
}
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
RedisCommand command;
command.format(
"EVALSHA %s 3 %s %s%s %s %d %s",
m_shaPop.c_str(),
getKeySetName().c_str(),
getTableName().c_str(),
getTableNameSeparator().c_str(),
getDelKeySetName().c_str(),
POP_BATCH_SIZE,
getStateHashPrefix().c_str());
RedisReply r(m_db, command);
auto ctx0 = r.getContext();
vkco.clear();
// if the set is empty, return an empty kco object
if (ctx0->type == REDIS_REPLY_NIL)
{
return;
}
assert(ctx0->type == REDIS_REPLY_ARRAY);
size_t n = ctx0->elements;
vkco.resize(n);
for (size_t ie = 0; ie < n; ie++)
{
auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
assert(values.empty());
auto& ctx = ctx0->element[ie];
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
std::string key(ctx->element[0]->str, ctx->element[0]->len);
kfvKey(kco) = key;
assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
auto ctx1 = ctx->element[1];
for (size_t i = 0; i < ctx1->elements / 2; i++)
{
FieldValueTuple e;
fvField(e).assign(ctx1->element[i * 2]->str, ctx1->element[i * 2]->len);
fvValue(e).assign(ctx1->element[i * 2 + 1]->str, ctx1->element[i * 2 + 1]->len);
values.push_back(e);
}
// if there is no field-value pair, the key is already deleted
if (values.empty())
{
kfvOp(kco) = DEL_COMMAND;
}
else
{
kfvOp(kco) = SET_COMMAND;
}
}
}
}