in common/subscriberstatetable.cpp [95:165]
void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{
vkco.clear();
if (!m_buffer.empty())
{
vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end());
m_buffer.clear();
return;
}
while (auto event = popEventBuffer())
{
KeyOpFieldsValuesTuple kco;
/* if the Key-space notification is empty, try next one. */
auto message = event->getReply<RedisMessage>();
if (message.type.empty())
{
continue;
}
/* The second element should be the original pattern matched */
auto ctx = event->getContext()->element[1];
if (message.pattern != m_keyspace)
{
SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", message.pattern.c_str(), m_keyspace.c_str());
continue;
}
string msg = message.channel;
size_t pos = msg.find(':');
if (pos == msg.npos)
{
SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", msg.c_str(), m_keyspace.c_str());
continue;
}
string table_entry = msg.substr(pos + 1);
pos = table_entry.find(m_table.getTableNameSeparator());
if (pos == table_entry.npos)
{
SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
continue;
}
string key = table_entry.substr(pos + 1);
string op = message.data;
if ("del" == op)
{
kfvKey(kco) = key;
kfvOp(kco) = DEL_COMMAND;
}
else
{
if (!m_table.get(key, kfvFieldsValues(kco)))
{
SWSS_LOG_NOTICE("Miss table key %s, possibly outdated", table_entry.c_str());
continue;
}
kfvKey(kco) = key;
kfvOp(kco) = SET_COMMAND;
}
vkco.push_back(kco);
}
m_keyspace_event_buffer.clear();
return;
}