common/notificationconsumer.h (39 lines of code) (raw):
#ifndef __NOTIFICATIONCONSUMER__
#define __NOTIFICATIONCONSUMER__
#include <string>
#include <vector>
#include <queue>
#include <hiredis/hiredis.h>
#include "dbconnector.h"
#include "json.h"
#include "logger.h"
#include "redisreply.h"
#include "selectable.h"
#include "table.h"
namespace swss {
static constexpr size_t DEFAULT_NC_POP_BATCH_SIZE = 2048;
class NotificationConsumer : public Selectable
{
public:
NotificationConsumer(swss::DBConnector *db, const std::string &channel, int pri = 100, size_t popBatchSize = DEFAULT_NC_POP_BATCH_SIZE);
// Pop one or multiple data from the internal queue which fed from redis socket
// Note:
// Ensure data ready before popping, either by select or peek
void pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values);
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco);
// Check the internal queue which fed from redis socket for data ready
// Returns:
// 1 - data immediately available inside internal queue, may be just fed from redis socket
// 0 - no data both in internal queue or redis socket
// -1 - error during peeking redis socket
int peek();
~NotificationConsumer() override;
int getFd() override;
uint64_t readData() override;
bool hasData() override;
bool hasCachedData() override;
const size_t POP_BATCH_SIZE;
private:
NotificationConsumer(const NotificationConsumer &other);
NotificationConsumer& operator = (const NotificationConsumer &other);
void processReply(redisReply *reply);
void subscribe();
swss::DBConnector *m_db;
swss::DBConnector *m_subscribe;
std::string m_channel;
std::queue<std::string> m_queue;
};
}
#endif // __NOTIFICATIONCONSUMER__