in common/consumerstatetable.cpp [36:94]
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
RedisCommand command;
command.format(
"EVALSHA %s 3 %s %s%s %s %d %s",
m_shaPop.c_str(),
getKeySetName().c_str(),
getTableName().c_str(),
getTableNameSeparator().c_str(),
getDelKeySetName().c_str(),
POP_BATCH_SIZE,
getStateHashPrefix().c_str());
RedisReply r(m_db, command);
auto ctx0 = r.getContext();
vkco.clear();
// if the set is empty, return an empty kco object
if (ctx0->type == REDIS_REPLY_NIL)
{
return;
}
assert(ctx0->type == REDIS_REPLY_ARRAY);
size_t n = ctx0->elements;
vkco.resize(n);
for (size_t ie = 0; ie < n; ie++)
{
auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
assert(values.empty());
auto& ctx = ctx0->element[ie];
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
std::string key(ctx->element[0]->str, ctx->element[0]->len);
kfvKey(kco) = key;
assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
auto ctx1 = ctx->element[1];
for (size_t i = 0; i < ctx1->elements / 2; i++)
{
FieldValueTuple e;
fvField(e).assign(ctx1->element[i * 2]->str, ctx1->element[i * 2]->len);
fvValue(e).assign(ctx1->element[i * 2 + 1]->str, ctx1->element[i * 2 + 1]->len);
values.push_back(e);
}
// if there is no field-value pair, the key is already deleted
if (values.empty())
{
kfvOp(kco) = DEL_COMMAND;
}
else
{
kfvOp(kco) = SET_COMMAND;
}
}
}