common/notificationconsumer.cpp (164 lines of code) (raw):
#include "notificationconsumer.h"
#include <iostream>
#include <deque>
#include "redisapi.h"
#define NOTIFICATION_SUBSCRIBE_TIMEOUT (1000)
#define REDIS_PUBLISH_MESSAGE_INDEX (2)
#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3)
swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const std::string &channel, int pri, size_t popBatchSize):
Selectable(pri),
POP_BATCH_SIZE(popBatchSize),
m_db(db),
m_subscribe(NULL),
m_channel(channel)
{
SWSS_LOG_ENTER();
while (true)
{
try
{
subscribe();
break;
}
catch(...)
{
delete m_subscribe;
SWSS_LOG_ERROR("failed to subscribe on %s", m_channel.c_str());
}
}
}
swss::NotificationConsumer::~NotificationConsumer()
{
delete m_subscribe;
}
void swss::NotificationConsumer::subscribe()
{
SWSS_LOG_ENTER();
/* Create new new context to DB */
if (m_db->getContext()->connection_type == REDIS_CONN_TCP)
m_subscribe = new DBConnector(m_db->getDbId(),
m_db->getContext()->tcp.host,
m_db->getContext()->tcp.port,
NOTIFICATION_SUBSCRIBE_TIMEOUT);
else
m_subscribe = new DBConnector(m_db->getDbId(),
m_db->getContext()->unix_sock.path,
NOTIFICATION_SUBSCRIBE_TIMEOUT);
std::string s = "SUBSCRIBE " + m_channel;
RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY);
SWSS_LOG_INFO("subscribed to %s", m_channel.c_str());
}
int swss::NotificationConsumer::getFd()
{
return m_subscribe->getContext()->fd;
}
uint64_t swss::NotificationConsumer::readData()
{
SWSS_LOG_ENTER();
redisReply *reply = nullptr;
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{
SWSS_LOG_ERROR("failed to read redis reply on channel %s", m_channel.c_str());
throw std::runtime_error("Unable to read redis reply");
}
else
{
RedisReply r(reply);
processReply(reply);
}
reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
RedisReply r(reply);
processReply(reply);
}
}
while(reply != nullptr && status == REDIS_OK);
if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
return 0;
}
bool swss::NotificationConsumer::hasData()
{
return m_queue.size() > 0;
}
bool swss::NotificationConsumer::hasCachedData()
{
return m_queue.size() > 1;
}
void swss::NotificationConsumer::processReply(redisReply *reply)
{
SWSS_LOG_ENTER();
if (reply->type != REDIS_REPLY_ARRAY)
{
SWSS_LOG_ERROR("expected ARRAY redis reply on channel %s, got: %d", m_channel.c_str(), reply->type);
throw std::runtime_error("getRedisReply operation failed");
}
if (reply->elements != REDIS_PUBLISH_MESSAGE_ELEMNTS)
{
SWSS_LOG_ERROR("expected %d elements in redis reply on channel %s, got: %zu",
REDIS_PUBLISH_MESSAGE_ELEMNTS,
m_channel.c_str(),
reply->elements);
throw std::runtime_error("getRedisReply operation failed");
}
std::string msg = std::string(reply->element[REDIS_PUBLISH_MESSAGE_INDEX]->str);
SWSS_LOG_DEBUG("got message: %s", msg.c_str());
m_queue.push(msg);
}
void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();
if (m_queue.empty())
{
SWSS_LOG_ERROR("notification queue is empty, can't pop");
throw std::runtime_error("notification queue is empty, can't pop");
}
std::string msg = m_queue.front();
m_queue.pop();
values.clear();
JSon::readJson(msg, values);
FieldValueTuple fvt = values.at(0);
op = fvField(fvt);
data = fvValue(fvt);
values.erase(values.begin());
}
void swss::NotificationConsumer::pops(std::deque<KeyOpFieldsValuesTuple> &vkco)
{
SWSS_LOG_ENTER();
vkco.clear();
while(!m_queue.empty())
{
while(!m_queue.empty())
{
std::string op;
std::string data;
std::vector<FieldValueTuple> values;
pop(op, data, values);
vkco.emplace_back(data, op, values);
}
// Too many popped, let's return to prevent DoS attach
if (vkco.size() >= POP_BATCH_SIZE)
return;
// Peek for more data in redis socket
int rc = swss::peekRedisContext(m_subscribe->getContext());
if (rc <= 0)
break;
readData();
}
}
int swss::NotificationConsumer::peek()
{
SWSS_LOG_ENTER();
if (m_queue.empty())
{
// Peek for more data in redis socket
int rc = swss::peekRedisContext(m_subscribe->getContext());
if (rc <= 0)
return rc;
// Feed into internal queue
readData();
}
return m_queue.empty() ? 0 : 1;
}