common/zmqserver.cpp (134 lines of code) (raw):
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include <zmq.h>
#include <pthread.h>
#include "zmqserver.h"
#include "binaryserializer.h"
using namespace std;
namespace swss {
ZmqServer::ZmqServer(const std::string& endpoint)
: ZmqServer(endpoint, "")
{
}
ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
m_mqPollThread = std::make_shared<std::thread>(&ZmqServer::mqPollThread, this);
SWSS_LOG_DEBUG("ZmqServer ctor endpoint: %s", endpoint.c_str());
}
ZmqServer::~ZmqServer()
{
m_runThread = false;
m_mqPollThread->join();
}
void ZmqServer::registerMessageHandler(
const std::string dbName,
const std::string tableName,
ZmqMessageHandler* handler)
{
auto dbResult = m_HandlerMap.insert(pair<string, map<string, ZmqMessageHandler*>>(dbName, map<string, ZmqMessageHandler*>()));
if (dbResult.second) {
SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str());
}
auto tableResult = dbResult.first->second.insert(pair<string, ZmqMessageHandler*>(tableName, handler));
if (tableResult.second) {
SWSS_LOG_DEBUG("ZmqServer register handler for db: %s, table: %s", dbName.c_str(), tableName.c_str());
}
}
ZmqMessageHandler* ZmqServer::findMessageHandler(
const std::string dbName,
const std::string tableName)
{
auto dbMappingIter = m_HandlerMap.find(dbName);
if (dbMappingIter == m_HandlerMap.end()) {
SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str());
return nullptr;
}
auto tableMappingIter = dbMappingIter->second.find(tableName);
if (tableMappingIter == dbMappingIter->second.end()) {
SWSS_LOG_DEBUG("ZmqServer can't find handler for db: %s, table: %s", dbName.c_str(), tableName.c_str());
return nullptr;
}
return tableMappingIter->second;
}
void ZmqServer::handleReceivedData(const char* buffer, const size_t size)
{
std::string dbName;
std::string tableName;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos;
BinarySerializer::deserializeBuffer(buffer, size, dbName, tableName, kcos);
// find handler
auto handler = findMessageHandler(dbName, tableName);
if (handler == nullptr) {
SWSS_LOG_WARN("ZmqServer can't find handler for received message: %s", buffer);
return;
}
handler->handleReceivedData(kcos);
}
void ZmqServer::mqPollThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");
// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}
int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
m_endpoint.c_str(),
zmq_errno(),
strerror(zmq_errno()));
}
// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;
SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
while (m_runThread)
{
// receive message
rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
SWSS_LOG_DEBUG("zmq_poll timeout or invalied event rc: %d, revents: %d", rc, poll_item.revents);
continue;
}
// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
SWSS_LOG_DEBUG("zmq_recv failed, endpoint: %s,zmqerrno: %d", m_endpoint.c_str(), zmq_err);
if (zmq_err == EINTR || zmq_err == EAGAIN)
{
continue;
}
else
{
SWSS_LOG_THROW("zmq_recv failed, endpoint: %s,zmqerrno: %d", m_endpoint.c_str(), zmq_err);
}
}
if (rc >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
rc);
}
m_buffer.at(rc) = 0; // make sure that we end string with zero before parse
SWSS_LOG_DEBUG("zmq received %d bytes", rc);
// deserialize and write to redis:
handleReceivedData(m_buffer.data(), rc);
}
zmq_close(socket);
zmq_ctx_destroy(context);
SWSS_LOG_NOTICE("mqPollThread end");
}
}