common/asyncdbupdater.cpp (100 lines of code) (raw):

#include <string> #include <deque> #include <limits> #include <hiredis/hiredis.h> #include <pthread.h> #include "asyncdbupdater.h" #include "dbconnector.h" #include "redisselect.h" #include "redisapi.h" #include "table.h" using namespace std; namespace swss { AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName) : m_db(db) , m_tableName(tableName) { m_runThread = true; m_dbUpdateThread = std::make_shared<std::thread>(&AsyncDBUpdater::dbUpdateThread, this); SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str()); } AsyncDBUpdater::~AsyncDBUpdater() { m_runThread = false; // notify db update thread exit m_dbUpdateDataNotifyCv.notify_all(); m_dbUpdateThread->join(); SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str()); } void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco) { { std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); m_dbUpdateDataQueue.push(pkco); } m_dbUpdateDataNotifyCv.notify_all(); } void AsyncDBUpdater::dbUpdateThread() { SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("dbUpdateThread begin"); // Different schedule policy has different min priority pthread_attr_t attr; int policy; pthread_attr_getschedpolicy(&attr, &policy); int min_priority = sched_get_priority_min(policy); // Use min priority will block poll thread pthread_setschedprio(pthread_self(), min_priority + 1); // Follow same logic in ConsumerStateTable: every received data will write to 'table'. DBConnector db(m_db->getDbName(), 0, true, m_db->getDBKey()); Table table(&db, m_tableName); std::mutex cvMutex; std::unique_lock<std::mutex> cvLock(cvMutex); while (true) { size_t count; count = queueSize(); if (count == 0) { // Check if there still data in queue before exit if (!m_runThread) { SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str()); break; } // when queue is empty, wait notification, when data come, continue to check queue size again m_dbUpdateDataNotifyCv.wait(cvLock); continue; } else { if (!m_runThread) { SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count); } } for (size_t ie = 0; ie < count; ie++) { auto& kco = *(m_dbUpdateDataQueue.front()); if (kfvOp(kco) == SET_COMMAND) { auto& values = kfvFieldsValues(kco); // Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. table.del(kfvKey(kco)); table.set(kfvKey(kco), values); } else if (kfvOp(kco) == DEL_COMMAND) { table.del(kfvKey(kco)); } else { SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str()); } { std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); m_dbUpdateDataQueue.pop(); } } } SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str()); } size_t AsyncDBUpdater::queueSize() { // size() is not thread safe std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); return m_dbUpdateDataQueue.size(); } }