meta/ZeroMQSelectableChannel.cpp (179 lines of code) (raw):

#include "ZeroMQSelectableChannel.h" #include "swss/logger.h" #include "swss/json.h" #include <zmq.h> #include <unistd.h> #define ZMQ_RESPONSE_BUFFER_SIZE (4*1024*1024) //#define ZMQ_POLL_TIMEOUT (2*60*1000) #define ZMQ_POLL_TIMEOUT (1000) using namespace sairedis; ZeroMQSelectableChannel::ZeroMQSelectableChannel( _In_ const std::string& endpoint): m_endpoint(endpoint), m_context(nullptr), m_socket(nullptr), m_fd(0), m_allowZmqPoll(false), m_runThread(true) { SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("binding on %s", endpoint.c_str()); m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); m_context = zmq_ctx_new();; m_socket = zmq_socket(m_context, ZMQ_REP); int rc = zmq_bind(m_socket, endpoint.c_str()); if (rc != 0) { SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", endpoint.c_str(), zmq_errno()); } size_t fd_len = sizeof(m_fd); rc = zmq_getsockopt(m_socket, ZMQ_FD, &m_fd, &fd_len); if (rc != 0) { SWSS_LOG_THROW("zmq_getsockopt failed on endpoint: %s, zmqerrno: %d", endpoint.c_str(), zmq_errno()); } m_zmlPollThread = std::make_shared<std::thread>(&ZeroMQSelectableChannel::zmqPollThread, this); } ZeroMQSelectableChannel::~ZeroMQSelectableChannel() { SWSS_LOG_ENTER(); m_runThread = false; m_allowZmqPoll = true; zmq_close(m_socket); zmq_ctx_destroy(m_context); SWSS_LOG_NOTICE("ending zmq poll thread for channel %s", m_endpoint.c_str()); m_zmlPollThread->join(); SWSS_LOG_NOTICE("ended zmq poll thread for channel %s", m_endpoint.c_str()); } void ZeroMQSelectableChannel::zmqPollThread() { SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("begin"); while (m_runThread) { zmq_pollitem_t items [1] = { }; items[0].socket = m_socket; items[0].events = ZMQ_POLLIN; m_allowZmqPoll = false; int rc = zmq_poll(items, 1, ZMQ_POLL_TIMEOUT); if (m_runThread == false) { SWSS_LOG_NOTICE("ending pool thread, since run is false"); break; } if (rc <= 0 && zmq_errno() == ETERM) { SWSS_LOG_NOTICE("zmq_poll ETERM"); break; } if (rc == 0) { SWSS_LOG_DEBUG("zmq_poll: no events, continue"); continue; } // TODO we should have loop here in case we get multiple events since // zmq poll will only signal events once, but in our case we don't // expect multiple events, since we want to send/receive int zmq_events = 0; size_t zmq_events_len = sizeof(zmq_events); rc = zmq_getsockopt(m_socket, ZMQ_EVENTS, &zmq_events, &zmq_events_len); if (rc != 0) { SWSS_LOG_ERROR("zmq_getsockopt FAILED, zmq_errno: %d", zmq_errno()); break; } if (rc == 0 && zmq_events & ZMQ_POLLIN) { m_selectableEvent.notify(); // will release epoll while (m_runThread && !m_allowZmqPoll) { usleep(10); // could be increased or replaced by spin lock //SWSS_LOG_NOTICE("m_allowZmqPoll == false"); } } else { // should not happen, we only except ZMQ_POLLIN events SWSS_LOG_ERROR("unknown condition: rc: %d, zmq_events: %d, bug?", rc, zmq_events); break; } } SWSS_LOG_NOTICE("end"); } // SelectableChannel overrides bool ZeroMQSelectableChannel::empty() { SWSS_LOG_ENTER(); return m_queue.size() == 0; } void ZeroMQSelectableChannel::pop( _Out_ swss::KeyOpFieldsValuesTuple& kco, _In_ bool initViewMode) { SWSS_LOG_ENTER(); if (m_queue.empty()) { SWSS_LOG_THROW("queue is empty, can't pop"); } std::string msg = m_queue.front(); m_queue.pop(); auto& values = kfvFieldsValues(kco); values.clear(); swss::JSon::readJson(msg, values); swss::FieldValueTuple fvt = values.at(0); kfvKey(kco) = fvField(fvt); kfvOp(kco) = fvValue(fvt); values.erase(values.begin()); } void ZeroMQSelectableChannel::set( _In_ const std::string& key, _In_ const std::vector<swss::FieldValueTuple>& values, _In_ const std::string& op) { SWSS_LOG_ENTER(); std::vector<swss::FieldValueTuple> copy = values; swss::FieldValueTuple opdata(key, op); copy.insert(copy.begin(), opdata); std::string msg = swss::JSon::buildJson(copy); SWSS_LOG_DEBUG("sending: %s", msg.c_str()); int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); // at this point we already did send/receive pattern, so we can notify // thread that we can poll again m_allowZmqPoll = true; if (rc <= 0) { SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s", m_endpoint.c_str(), zmq_errno(), zmq_strerror(zmq_errno())); } } // Selectable overrides int ZeroMQSelectableChannel::getFd() { SWSS_LOG_ENTER(); return m_selectableEvent.getFd(); } uint64_t ZeroMQSelectableChannel::readData() { SWSS_LOG_ENTER(); // clear selectable event so it could be triggered in next select() m_selectableEvent.readData(); int rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); if (rc < 0) { SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); } if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) { SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", ZMQ_RESPONSE_BUFFER_SIZE, rc); } m_buffer.at(rc) = 0; // make sure that we end string with zero before parse m_queue.push((char*)m_buffer.data()); return 0; } bool ZeroMQSelectableChannel::hasData() { SWSS_LOG_ENTER(); return m_queue.size() > 0; } bool ZeroMQSelectableChannel::hasCachedData() { SWSS_LOG_ENTER(); return m_queue.size() > 1; }