src/outmdsd/DataSender.h (39 lines of code) (raw):

#pragma once #ifndef __ENDPOINT_DATASENDER_H__ #define __ENDPOINT_DATASENDER_H__ #include <memory> #include <atomic> #include "LogItemPtr.h" namespace EndpointLog { template<typename T> class ConcurrentQueue; template<typename T> class ConcurrentMap; class SocketClient; /// This class will keep on sending incoming data in a shared queue to a /// socket server in a multi-thread system. Other threads will keep on /// pushing new data to the same shared queue (see BufferedLogger class). /// /// DataSender will run in an infinite loop to pop and send each item /// from the shared queue to socket server. If no item to pop, it will /// wait until there is item in the queue. /// /// To avoid message loss, each item can be optionally saved to a cache for /// future resend (see DataResender class). /// class DataSender { public: /// <summary> /// Constructor. /// <param name="sockClient"> socket client object</param> /// <param name="dataCache"> cache for data backup if not NULL. If NULL, no backup</param> /// <param name="incomingQueue"> data to be sent. DataSender will pop each item in the queue /// and send it to socket server. If no data to pop, it will wait until there is data to pop. /// </param> DataSender( const std::shared_ptr<SocketClient> & sockClient, const std::shared_ptr<ConcurrentMap<LogItemPtr>> & dataCache, const std::shared_ptr<ConcurrentQueue<LogItemPtr>> & incomingQueue ); ~DataSender(); // DataSender is not copyable but movable. DataSender(const DataSender& other) = delete; DataSender& operator=(const DataSender& other) = delete; DataSender(DataSender&& other) = default; DataSender& operator=(DataSender&& other) = default; /// <summary> /// Run sending process. It will run in an infinite loop until it is told to stop. /// It will pop and process each item in the incoming queue. If queue is empty, /// it will wait until queue has item. /// </summary> void Run(); /// <summary> /// Notify sender to stop. This is typically called in a thread different from /// the Run() thread. /// </summary> void Stop(); /// Return total number of send. including fails and successes. size_t GetNumSend() const { return m_numSend; } /// Return total number of successful send. size_t GetNumSuccess() const { return m_numSuccess; } private: /// Define interruption point for Run() loop. void InterruptPoint() const; /// Send a data string. It will send all chars until it hits terminal NUL. /// NUL is not sent. void Send(const char* data); private: std::shared_ptr<SocketClient> m_socketClient; std::shared_ptr<ConcurrentMap<LogItemPtr>> m_dataCache; // for data backup std::shared_ptr<ConcurrentQueue<LogItemPtr>> m_incomingQueue; // incoming data queue std::atomic<bool> m_stopSender{false}; // a flag to notify sender loop to stop. size_t m_numSend = 0; // number of items trying to be sent. This includes fails and successes. size_t m_numSuccess = 0; // number of success send. }; } // namespace #endif // __ENDPOINT_DATASENDER_H__