common/notificationproducer.cpp (29 lines of code) (raw):
#include "notificationproducer.h"
#define NON_BUFFERED_COMMAND_BUFFER_SIZE 1
swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, const std::string &channel):
m_ownedpipe(std::make_unique<swss::RedisPipeline>(db, NON_BUFFERED_COMMAND_BUFFER_SIZE)), m_pipe(m_ownedpipe.get()), m_channel(channel), m_buffered(false)
{
}
swss::NotificationProducer::NotificationProducer(swss::RedisPipeline *pipeline, const std::string &channel, bool buffered):
m_pipe(pipeline), m_channel(channel), m_buffered(buffered)
{
}
int64_t swss::NotificationProducer::send(const std::string &op, const std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();
FieldValueTuple opdata(op, data);
values.insert(values.begin(), opdata);
std::string msg = JSon::buildJson(values);
values.erase(values.begin());
SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str());
RedisCommand command;
command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str());
if (m_buffered)
{
m_pipe->push(command, REDIS_REPLY_INTEGER);
// if operating in buffered mode return -1 as we can't know the number
// of client's that have read the message immediately
return -1;
}
RedisReply reply = m_pipe->push(command);
reply.checkReplyType(REDIS_REPLY_INTEGER);
return reply.getReply<long long int>();
}