common/zmqserver.h (42 lines of code) (raw):

#pragma once #include <string> #include <deque> #include <condition_variable> #include <vector> #include "table.h" #define MQ_RESPONSE_MAX_COUNT (16*1024*1024) #define MQ_SIZE 100 #define MQ_MAX_RETRY 10 #define MQ_POLL_TIMEOUT (1000) #define MQ_WATERMARK 10000 /***** ZMQ PORT *****/ static const int ORCH_ZMQ_PORT = 8020; namespace swss { class ZmqMessageHandler { public: virtual ~ZmqMessageHandler() {}; virtual void handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos) = 0; }; class ZmqServer { public: /* The default value of pop batch size is 128 */ static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); void registerMessageHandler( const std::string dbName, const std::string tableName, ZmqMessageHandler* handler); private: void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); ZmqMessageHandler* findMessageHandler(const std::string dbName, const std::string tableName); std::vector<char> m_buffer; volatile bool m_runThread; std::shared_ptr<std::thread> m_mqPollThread; std::string m_endpoint; std::string m_vrf; std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap; }; }