common/consumertable.cpp (85 lines of code) (raw):
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <system_error>
#include "common/redisreply.h"
#include "common/consumertable.h"
#include "common/json.h"
#include "common/logger.h"
#include "common/redisapi.h"
using namespace std;
namespace swss {
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
, m_modifyRedis(true)
{
std::string luaScript = loadLuaScript("consumer_table_pops.lua");
m_shaPop = loadRedisScript(db, luaScript);
for (;;)
{
RedisReply watch(m_db, string("WATCH ") + getKeyValueOpQueueTableName(), REDIS_REPLY_STATUS);
watch.checkStatusOK();
multi();
enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
subscribe(m_db, getChannelName(m_db->getDbId()));
enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
bool succ = exec();
if (succ) break;
}
RedisReply r(dequeueReply());
long long int len = r.getReply<long long int>();
//Key, Value and OP are in one list, they are processed in one shot
setQueueLength(len/3);
}
void ConsumerTable::setModifyRedis(bool modify)
{
SWSS_LOG_ENTER();
m_modifyRedis = modify;
}
void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %d %d",
m_shaPop.c_str(),
getKeyValueOpQueueTableName().c_str(),
(prefix+getTableName()).c_str(),
POP_BATCH_SIZE,
m_modifyRedis ? 1 : 0);
RedisReply r(m_db, command, REDIS_REPLY_ARRAY);
auto ctx0 = r.getContext();
vkco.clear();
// if the set is empty, return an empty kco object
if (r.getContext()->type == REDIS_REPLY_NIL)
{
return;
}
assert(ctx0->type == REDIS_REPLY_ARRAY);
size_t n = ctx0->elements;
vkco.resize(n);
for (size_t ie = 0; ie < n; ie++)
{
auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
assert(values.empty());
auto& ctx = ctx0->element[ie];
string key = ctx->element[0]->str;
kfvKey(kco) = key;
string op = ctx->element[1]->str;
kfvOp(kco) = op;
for (size_t i = 2; i < ctx->elements; i += 2)
{
if (i+1 >= ctx->elements)
{
SWSS_LOG_ERROR("invalid number of elements in returned table: %zu >= %zu", i+1, ctx->elements);
throw runtime_error("invalid number of elements in returned table");
}
FieldValueTuple e;
fvField(e) = ctx->element[i+0]->str;
fvValue(e) = ctx->element[i+1]->str;
values.push_back(e);
}
}
}
}