common/pubsub.cpp (120 lines of code) (raw):
#include "pubsub.h"
#include "dbconnector.h"
#include "logger.h"
#include "redisreply.h"
using namespace std;
using namespace swss;
PubSub::PubSub(DBConnector *parent)
: m_parentConnector(parent)
{
}
void PubSub::psubscribe(const std::string &pattern)
{
if (m_subscribe)
{
m_select.removeSelectable(this);
}
RedisSelect::psubscribe(m_parentConnector, pattern);
m_select.addSelectable(this);
}
void PubSub::punsubscribe(const std::string &pattern)
{
RedisSelect::punsubscribe(pattern);
m_select.removeSelectable(this);
}
uint64_t PubSub::readData()
{
redisReply *reply = nullptr;
/* Read data from redis. This call is non blocking. This method
* is called from Select framework when data is available in socket.
* NOTE: All data should be stored in event buffer. It won't be possible to
* read them second time. */
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{
throw std::runtime_error("Unable to read redis reply");
}
m_keyspace_event_buffer.emplace_back(make_shared<RedisReply>(reply));
/* Try to read data from redis cacher.
* If data exists put it to event buffer.
* NOTE: Keyspace event is not persistent and it won't
* be possible to read it second time. If it is not stared in
* the buffer it will be lost. */
reply = nullptr;
int status;
do
{
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{
m_keyspace_event_buffer.emplace_back(make_shared<RedisReply>(reply));
}
}
while(reply != nullptr && status == REDIS_OK);
if (status != REDIS_OK)
{
throw RedisError("Unable to redisGetReplyFromReader", m_subscribe->getContext());
}
return 0;
}
bool PubSub::hasData()
{
return m_keyspace_event_buffer.size() > 0;
}
bool PubSub::hasCachedData()
{
return m_keyspace_event_buffer.size() > 1;
}
map<string, string> PubSub::get_message(double timeout, bool interrupt_on_signal)
{
return get_message_internal(timeout, interrupt_on_signal).second;
}
MessageResultPair PubSub::get_message_internal(double timeout, bool interrupt_on_signal)
{
MessageResultPair ret;
if (!m_subscribe)
{
ret.first = Select::ERROR;
return ret;
}
Selectable *selected;
int rc = m_select.select(&selected, int(timeout * 1000), interrupt_on_signal);
ret.first = rc;
switch (rc)
{
case Select::ERROR:
throw RedisError("Failed to select", m_subscribe->getContext());
case Select::TIMEOUT:
case Select::SIGNALINT:
return ret;
case Select::OBJECT:
break;
default:
throw logic_error("Unexpected select result");
}
// Now we have some data to read
auto event = popEventBuffer();
if (!event)
{
return ret;
}
auto message = event->getReply<RedisMessage>();
ret.second["type"] = message.type;
ret.second["pattern"] = message.pattern;
ret.second["channel"] = message.channel;
ret.second["data"] = message.data;
return ret;
}
// Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++
// due to the `yield` syntax, so we implement this function for blocking listen one message
std::map<std::string, std::string> PubSub::listen_message(bool interrupt_on_signal)
{
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
MessageResultPair ret;
for (;;)
{
ret = get_message_internal(GET_MESSAGE_INTERVAL, interrupt_on_signal);
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
{
break;
}
}
return ret.second;
}
shared_ptr<RedisReply> PubSub::popEventBuffer()
{
if (m_keyspace_event_buffer.empty())
{
return NULL;
}
auto reply = m_keyspace_event_buffer.front();
m_keyspace_event_buffer.pop_front();
return reply;
}