common/redisselect.cpp (82 lines of code) (raw):
#include <string>
#include <memory>
#include <hiredis/hiredis.h>
#include "dbconnector.h"
#include "redisreply.h"
#include "selectable.h"
#include "redisselect.h"
namespace swss {
RedisSelect::RedisSelect(int pri) : Selectable(pri), m_queueLength(-1)
{
}
int RedisSelect::getFd()
{
return m_subscribe->getContext()->fd;
}
const DBConnector* RedisSelect::getDbConnector() const
{
return m_subscribe.get();
}
uint64_t RedisSelect::readData()
{
redisReply *reply = nullptr;
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
throw std::runtime_error("Unable to read redis reply from RedisSelect::readData() redisGetReply()");
freeReplyObject(reply);
m_queueLength++;
reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
m_queueLength++;
freeReplyObject(reply);
}
}
while(reply != nullptr && status == REDIS_OK);
if (status != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply from RedisSelect::readData() redisGetReplyFromReader()");
}
return 0;
}
bool RedisSelect::hasData()
{
return m_queueLength > 0;
}
bool RedisSelect::hasCachedData()
{
return m_queueLength > 1;
}
bool RedisSelect::initializedWithData()
{
return m_queueLength > 0;
}
void RedisSelect::updateAfterRead()
{
m_queueLength--;
}
/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void RedisSelect::subscribe(DBConnector* db, const std::string &channelName)
{
m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));
/* Send SUBSCRIBE #channel command */
m_subscribe->subscribe(channelName);
}
/* PSUBSCRIBE */
void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName)
{
m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));
/*
* Send PSUBSCRIBE #channel command on the
* non-blocking subscriber DBConnector
*/
m_subscribe->psubscribe(channelName);
}
/* PUNSUBSCRIBE */
void RedisSelect::punsubscribe(const std::string &channelName)
{
/*
* Send PUNSUBSCRIBE #channel command on the
* non-blocking subscriber DBConnector
*/
if (m_subscribe)
{
m_subscribe->psubscribe(channelName);
}
}
void RedisSelect::setQueueLength(long long int queueLength)
{
m_queueLength = queueLength;
}
}