common/redispipeline.h (169 lines of code) (raw):

#pragma once #include <string> #include <queue> #include <unordered_set> #include <functional> #include <chrono> #include <iostream> #include "redisreply.h" #include "rediscommand.h" #include "dbconnector.h" #include "logger.h" #include "unistd.h" #include "sys/syscall.h" #define gettid() syscall(SYS_gettid) namespace swss { class RedisPipeline { public: const size_t COMMAND_MAX; static constexpr int NEWCONNECTOR_TIMEOUT = 0; RedisPipeline(const DBConnector *db, size_t sz = 128) : COMMAND_MAX(sz) , m_remaining(0) , m_shaPub("") { m_db = db->newConnector(NEWCONNECTOR_TIMEOUT); initializeOwnerTid(); lastHeartBeat = std::chrono::steady_clock::now(); } ~RedisPipeline() { if (m_ownerTid == gettid()) { // call flush from different thread will trigger race condition issue. flush(); } else { SWSS_LOG_NOTICE("RedisPipeline dtor is called from another thread, possibly due to exit(), Database: %s", getDbName().c_str()); } delete m_db; } redisReply *push(const RedisCommand& command, int expectedType) { switch (expectedType) { case REDIS_REPLY_NIL: case REDIS_REPLY_STATUS: case REDIS_REPLY_INTEGER: { int rc = command.appendTo(m_db->getContext()); if (rc != REDIS_OK) { // The only reason of error is REDIS_ERR_OOM (Out of memory) // ref: https://github.com/redis/hiredis/blob/master/hiredis.c throw std::bad_alloc(); } m_expectedTypes.push(expectedType); m_remaining++; mayflush(); return NULL; } default: { flush(); RedisReply r(m_db, command, expectedType); return r.release(); } } } redisReply *push(const RedisCommand& command) { flush(); RedisReply r(m_db, command); return r.release(); } std::string loadRedisScript(const std::string& script) { RedisCommand loadcmd; loadcmd.format("SCRIPT LOAD %s", script.c_str()); RedisReply r = push(loadcmd, REDIS_REPLY_STRING); std::string sha = r.getReply<std::string>(); return sha; } // The caller is responsible to release the reply object redisReply *pop() { if (m_remaining == 0) return NULL; redisReply *reply; int rc = redisGetReply(m_db->getContext(), (void**)&reply); if (rc != REDIS_OK) { throw RedisError("Failed to redisGetReply in RedisPipeline::pop", m_db->getContext()); } RedisReply r(reply); m_remaining--; int expectedType = m_expectedTypes.front(); m_expectedTypes.pop(); r.checkReplyType(expectedType); if (expectedType == REDIS_REPLY_STATUS) { r.checkStatusOK(); } return r.release(); } void flush() { lastHeartBeat = std::chrono::steady_clock::now(); if (m_remaining == 0) { return; } while(m_remaining) { // Construct an object to use its dtor, so that resource is released RedisReply r(pop()); } publish(); } size_t size() { return m_remaining; } int getDbId() { return m_db->getDbId(); } std::string getDbName() { return m_db->getDbName(); } DBConnector *getDBConnector() { return m_db; } void initializeOwnerTid() { m_ownerTid = gettid(); } void addChannel(std::string channel) { if (m_channels.find(channel) != m_channels.end()) return; m_channels.insert(channel); m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G')\n"; m_shaPub = loadRedisScript(m_luaPub); } int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now()) { return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count()); } void publish() { if (m_shaPub.empty()) { return; } RedisCommand cmd; cmd.format( "EVALSHA %s 0", m_shaPub.c_str()); RedisReply r(m_db, cmd); } private: DBConnector *m_db; std::queue<int> m_expectedTypes; size_t m_remaining; long int m_ownerTid; std::string m_luaPub; std::string m_shaPub; std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked std::unordered_set<std::string> m_channels; void mayflush() { if (m_remaining >= COMMAND_MAX) flush(); } }; }