void ZmqClient::sendMsg()

in common/zmqclient.cpp [117:198]


void ZmqClient::sendMsg(
        const std::string& dbName,
        const std::string& tableName,
        const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
    int serializedlen = (int)BinarySerializer::serializeBuffer(
                                                        m_sendbuffer.data(),
                                                        m_sendbuffer.size(),
                                                        dbName,
                                                        tableName,
                                                        kcos);

    if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
    {
        SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
                MQ_RESPONSE_MAX_COUNT,
                serializedlen);
    }

    SWSS_LOG_DEBUG("sending: %d", serializedlen);
    int zmq_err = 0;
    int retry_delay = 10;
    int rc = 0;
    for (int i = 0; i <=  MQ_MAX_RETRY; ++i)
    {
        {
            // ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
            std::lock_guard<std::mutex> lock(m_socketMutex);

            // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
            rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
        }

        if (rc >= 0)
        {
            SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
            return;
        }

        zmq_err = zmq_errno();
        // sleep (2 ^ retry time) * 10 ms
        retry_delay *= 2;
        if (zmq_err == EINTR
            || zmq_err== EFSM)
        {
            // EINTR: interrupted by signal
            // EFSM: socket state not ready
            //       For example when ZMQ socket still not receive reply message from last sended package.
            //       There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen.
            // for more detail, please check: http://api.zeromq.org/2-1:zmq-send
            SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

            retry_delay = 0;
        }
        else if (zmq_err == EAGAIN)
        {
            // EAGAIN: ZMQ is full to need try again
            SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
        }
        else if (zmq_err == ETERM)
        {
            m_connected = false;
            auto message =  "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
            SWSS_LOG_ERROR("%s", message.c_str());
            throw system_error(make_error_code(errc::connection_reset), message);
        }
        else
        {
            // for other error, send failed immediately.
            auto message =  "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
            SWSS_LOG_ERROR("%s", message.c_str());
            throw system_error(make_error_code(errc::io_error), message);
        }

        usleep(retry_delay * 1000);
    }

    // failed after retry
    auto message =  "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
    SWSS_LOG_ERROR("%s", message.c_str());
    throw system_error(make_error_code(errc::io_error), message);
}