void ZmqClient::connect()

in common/zmqclient.cpp [64:115]


void ZmqClient::connect()
{
    if (m_connected)
    {
        SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str());
        return;
    }

    std::lock_guard<std::mutex> lock(m_socketMutex);
    if (m_socket)
    {
        int rc = zmq_close(m_socket);
        if (rc != 0)
        {
            SWSS_LOG_ERROR("failed to close zmq socket, zmqerrno: %d", zmq_errno());
        }
    }

    if (m_context)
    {
        zmq_ctx_destroy(m_context);
    }
    
    // ZMQ Client/Server are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
    m_context = zmq_ctx_new();
    m_socket = zmq_socket(m_context, ZMQ_PUSH);
    
    // timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
    int linger = 0;
    zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));

    // Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
    int high_watermark = MQ_WATERMARK;
    zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));

    if (!m_vrf.empty())
    {
        zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
    }

    SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str());
    int rc = zmq_connect(m_socket, m_endpoint.c_str());
    if (rc != 0)
    {
        m_connected = false;
        SWSS_LOG_THROW("failed to connect to zmq endpoint %s, zmqerrno: %d",
                m_endpoint.c_str(),
                zmq_errno());
    }

    m_connected = true;
}