common/subscriberstatetable.cpp (136 lines of code) (raw):

#include <string> #include <deque> #include <limits> #include <hiredis/hiredis.h> #include "dbconnector.h" #include "table.h" #include "selectable.h" #include "redisselect.h" #include "redisapi.h" #include "tokenize.h" #include "subscriberstatetable.h" using namespace std; namespace swss { SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName) { m_keyspace = "__keyspace@"; m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*"; psubscribe(m_db, m_keyspace); vector<string> keys; m_table.getKeys(keys); for (const auto &key: keys) { KeyOpFieldsValuesTuple kco; kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; if (!m_table.get(key, kfvFieldsValues(kco))) { continue; } m_buffer.push_back(kco); } } uint64_t SubscriberStateTable::readData() { redisReply *reply = nullptr; /* Read data from redis. This call is non blocking. This method * is called from Select framework when data is available in socket. * NOTE: All data should be stored in event buffer. It won't be possible to * read them second time. */ if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } m_keyspace_event_buffer.emplace_back(make_shared<RedisReply>(reply)); /* Try to read data from redis cacher. * If data exists put it to event buffer. * NOTE: Keyspace event is not persistent and it won't * be possible to read it second time. If it is not stared in * the buffer it will be lost. */ reply = nullptr; int status; do { status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)); if(reply != nullptr && status == REDIS_OK) { m_keyspace_event_buffer.emplace_back(make_shared<RedisReply>(reply)); } } while(reply != nullptr && status == REDIS_OK); if (status != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } return 0; } bool SubscriberStateTable::hasData() { return m_buffer.size() > 0 || m_keyspace_event_buffer.size() > 0; } bool SubscriberStateTable::hasCachedData() { return m_buffer.size() + m_keyspace_event_buffer.size() > 1; } void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/) { vkco.clear(); if (!m_buffer.empty()) { vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end()); m_buffer.clear(); return; } while (auto event = popEventBuffer()) { KeyOpFieldsValuesTuple kco; /* if the Key-space notification is empty, try next one. */ auto message = event->getReply<RedisMessage>(); if (message.type.empty()) { continue; } /* The second element should be the original pattern matched */ auto ctx = event->getContext()->element[1]; if (message.pattern != m_keyspace) { SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", message.pattern.c_str(), m_keyspace.c_str()); continue; } string msg = message.channel; size_t pos = msg.find(':'); if (pos == msg.npos) { SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", msg.c_str(), m_keyspace.c_str()); continue; } string table_entry = msg.substr(pos + 1); pos = table_entry.find(m_table.getTableNameSeparator()); if (pos == table_entry.npos) { SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } string key = table_entry.substr(pos + 1); string op = message.data; if ("del" == op) { kfvKey(kco) = key; kfvOp(kco) = DEL_COMMAND; } else { if (!m_table.get(key, kfvFieldsValues(kco))) { SWSS_LOG_NOTICE("Miss table key %s, possibly outdated", table_entry.c_str()); continue; } kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; } vkco.push_back(kco); } m_keyspace_event_buffer.clear(); return; } shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer() { if (m_keyspace_event_buffer.empty()) { return NULL; } auto reply = m_keyspace_event_buffer.front(); m_keyspace_event_buffer.pop_front(); return reply; } }