in common/asyncdbupdater.cpp [46:119]
void AsyncDBUpdater::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");
// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);
// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true, m_db->getDBKey());
Table table(&db, m_tableName);
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);
while (true)
{
size_t count;
count = queueSize();
if (count == 0)
{
// Check if there still data in queue before exit
if (!m_runThread)
{
SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str());
break;
}
// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}
else
{
if (!m_runThread)
{
SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count);
}
}
for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());
if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);
// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str());
}
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
}
SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str());
}