common/zmqclient.cpp (158 lines of code) (raw):
#include <stdlib.h>
#include <tuple>
#include <sstream>
#include <utility>
#include <algorithm>
#include <chrono>
#include <cmath>
#include <exception>
#include <system_error>
#include <zmq.h>
#include "zmqclient.h"
#include "binaryserializer.h"
using namespace std;
namespace swss {
ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
}
ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}
ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
if (m_socket)
{
int rc = zmq_close(m_socket);
if (rc != 0)
{
SWSS_LOG_ERROR("failed to close zmq socket, zmqerrno: %d",
zmq_errno());
}
}
if (m_context)
{
zmq_ctx_destroy(m_context);
}
}
void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_vrf = vrf;
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);
connect();
}
bool ZmqClient::isConnected()
{
return m_connected;
}
void ZmqClient::connect()
{
if (m_connected)
{
SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str());
return;
}
std::lock_guard<std::mutex> lock(m_socketMutex);
if (m_socket)
{
int rc = zmq_close(m_socket);
if (rc != 0)
{
SWSS_LOG_ERROR("failed to close zmq socket, zmqerrno: %d", zmq_errno());
}
}
if (m_context)
{
zmq_ctx_destroy(m_context);
}
// ZMQ Client/Server are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUSH);
// timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
int linger = 0;
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}
SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str());
int rc = zmq_connect(m_socket, m_endpoint.c_str());
if (rc != 0)
{
m_connected = false;
SWSS_LOG_THROW("failed to connect to zmq endpoint %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno());
}
m_connected = true;
}
void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_sendbuffer.data(),
m_sendbuffer.size(),
dbName,
tableName,
kcos);
if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}
SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
std::lock_guard<std::mutex> lock(m_socketMutex);
// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}
if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
return;
}
zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
if (zmq_err == EINTR
|| zmq_err== EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);
retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
m_connected = false;
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
usleep(retry_delay * 1000);
}
// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
}