common/zmqconsumerstatetable.cpp (87 lines of code) (raw):
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include <zmq.h>
#include <pthread.h>
#include "dbconnector.h"
#include "table.h"
#include "selectable.h"
#include "selectableevent.h"
#include "redisselect.h"
#include "redisapi.h"
#include "zmqconsumerstatetable.h"
#include "binaryserializer.h"
using namespace std;
namespace swss {
ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize, int pri, bool dbPersistence)
: Selectable(pri)
, TableBase(tableName, TableBase::getTableSeparator(db->getDbId()))
, m_db(db)
, m_zmqServer(zmqServer)
{
if (dbPersistence)
{
SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str());
m_asyncDBUpdater = std::make_unique<AsyncDBUpdater>(db, tableName);
}
else
{
SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str());
m_asyncDBUpdater = nullptr;
}
m_zmqServer.registerMessageHandler(m_db->getDbName(), tableName, this);
SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}
void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
for (auto kco : kcos)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_asyncDBUpdater != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*kco);
}
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(kco);
}
if (m_asyncDBUpdater != nullptr)
{
m_asyncDBUpdater->update(clone);
}
}
m_selectableEvent.notify(); // will release epoll
}
/* Get multiple pop elements */
void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
size_t count;
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
// For new data append to m_dataQueue during pops, will not be include in result.
count = m_receivedOperationQueue.size();
if (!count)
{
return;
}
}
vkco.clear();
for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_receivedOperationQueue.front());
vkco.push_back(std::move(kco));
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.pop();
}
}
}
size_t ZmqConsumerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
{
throw system_error(make_error_code(errc::operation_not_supported),
"Database persistence is not enabled");
}
return m_asyncDBUpdater->queueSize();
}
}