common/events_common.h (287 lines of code) (raw):
/* The internal code that caches runtime-IDs could retire upon de-init */
#ifndef _EVENTS_COMMON_H
#define _EVENTS_COMMON_H
/*
* common APIs used by events code.
*/
#include <stdio.h>
#include <chrono>
#include <fstream>
#include <errno.h>
#include <cxxabi.h>
#include "string.h"
#include <nlohmann/json.hpp>
#include "zmq.h"
#include <unordered_map>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include "logger.h"
using namespace std;
using namespace chrono;
#define ERR_MESSAGE_INVALID -2
#define ERR_OTHER -1
/*
* Max count of possible concurrent event publishers
* We maintain a cache of last seen sequence number per publisher.
* This provides a MAX ceiling for cache.
* An exiting publisher retires its runtime-ID explicitly.
* A crashed publisher or event lost for any reason will leave
* behind the runtime ID. Overtime, these leaked IDs could fill the cache.
* Hence whenever the cache hits this max, old instances are removed.
* old instances are identified using time of last publish.
*
* In the scenario of too many publisher crashed and an instance that
* that does not publish for a very long time, could get purged.
* But crashing publishers is a bigger issue and we need not be
* perfect in that scenario.
*/
#define MAX_PUBLISHERS_COUNT 1000
#define RET_ON_ERR(res, msg, ...)\
if (!(res)) {\
int _e = errno; \
SWSS_LOG_INFO(msg, ##__VA_ARGS__); \
SWSS_LOG_INFO("last:errno=%d", _e); \
goto out; }
static const int LINGER_TIMEOUT = 100; /* Linger timeout in milliseconds */
/* helper API to print variable type */
/*
*Usage:
* const int ci = 0;
* std::cout << type_name<decltype(ci)>() << '\n';
*
* map<string, string> l;
* std::cout << type_name<decltype(l)>() << '\n';
*
* tt_t t;
* std::cout << type_name<decltype(t)>() << '\n';
* std::cout << type_name<decltype(tt_t)>() << '\n';
*/
template <typename T> string type_name();
template <class T>
string
type_name()
{
typedef typename remove_reference<T>::type TR;
unique_ptr<char, void(*)(void*)> own
(
abi::__cxa_demangle(typeid(TR).name(), nullptr,
nullptr, nullptr),
free
);
string r = own != nullptr ? own.get() : typeid(TR).name();
if (is_const<TR>::value)
r += " const";
if (is_volatile<TR>::value)
r += " volatile";
if (is_lvalue_reference<T>::value)
r += "&";
else if (is_rvalue_reference<T>::value)
r += "&&";
return r;
}
template <class T>
string
get_typename(T &val)
{
return type_name<decltype(val)>();
}
/* map to human readable str; Useful for error reporting. */
template <typename Map>
string
map_to_str(const Map &m)
{
stringstream _ss;
string sep;
_ss << "{";
for (const auto elem: m) {
_ss << sep << "{" << elem.first << "," << elem.second.substr(0,10) << "}";
if (sep.empty()) {
sep = ", ";
}
}
_ss << "}";
return _ss.str();
}
// Some simple definitions
//
typedef map<string, string> map_str_str_t;
/*
* Config that can be read from init_cfg
*/
#define INIT_CFG_PATH "/etc/sonic/init_cfg.json"
#define CFG_EVENTS_KEY "events"
/* configurable entities' keys */
/* zmq proxy's xsub & xpub end points */
#define XSUB_END_KEY "xsub_path"
#define XPUB_END_KEY "xpub_path"
/* Eventd service end point; All service req/resp occur via this path */
#define REQ_REP_END_KEY "req_rep_path"
/* Internal proxy to service path for capturing events for caching */
#define CAPTURE_END_KEY "capture_path"
#define STATS_UPD_SECS "stats_upd_secs"
#define CACHE_MAX_CNT "cache_max_cnt"
/* init config from file */
void read_init_config(const char *fname);
/* Read config entry for a key */
string get_config(const string key);
template<typename T>
T get_config_data(const string key, T def)
{
string s(get_config(key));
if (s.empty()) {
return def;
}
else {
T v;
stringstream ss(s);
ss >> v;
return v;
}
}
const string get_timestamp();
/*
* events are published as two part zmq message.
* First part only has the event source, so receivers could
* filter by source.
*
* Second part contains serialized form of map as defined in internal_event_t.
*/
/*
* This is data going over wire and using cache. So be conservative
* on names
*/
#define EVENT_STR_DATA "d"
#define EVENT_RUNTIME_ID "r"
#define EVENT_SEQUENCE "s"
#define EVENT_EPOCH "t"
typedef map<string, string> internal_event_t;
/* Sequence is converted to string in message */
typedef uint32_t sequence_t;
#define SEQUENCE_MAX UINT32_MAX
typedef string runtime_id_t;
/*
* internal_event_t internal_event_ref = {
* { EVENT_STR_DATA, "<json string of event>" },
* { EVENT_RUNTIME_ID, "<assigned runtime id of publisher>" },
* { EVENT_SEQUENCE, "<sequence number of event>" },
* { EVENT_EPOCH, "<epoch time at the point of publish>" } };
*/
/*
* Control messages could be sent as events with specific
* prefix "CONTROL_"
* e.g. CONTROL_DEINIT
*/
#define EVENT_STR_CTRL_PREFIX "CONTROL_"
#define EVENT_STR_CTRL_PREFIX_SZ ((int)sizeof(EVENT_STR_CTRL_PREFIX) - 1)
/* The internal code that caches runtime-IDs could retire upon de-init */
#define EVENT_STR_CTRL_DEINIT EVENT_STR_CTRL_PREFIX "DEINIT"
typedef vector<internal_event_t> internal_events_lst_t;
/* Cache maintains the part 2 of an event as serialized string. */
typedef string event_serialized_t; // events_data_type_t;
typedef vector<event_serialized_t> event_serialized_lst_t; // events_data_lst_t;
sequence_t str_to_seq(const string s);
string seq_to_str(sequence_t seq);
struct serialization
{
/*
* Way to serialize map or vector
* boost::archive::text_oarchive could be used to archive any struct/class
* but that class needs some additional support, that declares
* boost::serialization::access as private friend and couple more tweaks.
* The std::map & vector inherently supports serialization.
*/
template <typename Map>
int
serialize(const Map& data, string &s)
{
s.clear();
ostringstream _ser_ss;
try {
boost::archive::text_oarchive oarch(_ser_ss);
oarch << data;
s = _ser_ss.str();
return 0;
}
catch (exception& e) {
stringstream _ser_ex_ss;
_ser_ex_ss << e.what() << "map type:" << get_typename(data);
SWSS_LOG_ERROR("serialize Failed: %s", _ser_ex_ss.str().c_str());
return ERR_MESSAGE_INVALID;
}
}
template <typename Map>
int
deserialize(const string& s, Map& data)
{
try {
istringstream ss(s);
boost::archive::text_iarchive iarch(ss);
iarch >> data;
return 0;
}
catch (exception& e) {
stringstream _ss_ex;
_ss_ex << e.what() << "str[0:64]:(" << s.substr(0, 64) << ") data type: "
<< get_typename(data);
SWSS_LOG_ERROR("deserialize Failed: %s", _ss_ex.str().c_str());
return ERR_MESSAGE_INVALID;
}
}
template <typename Map>
int
map_to_zmsg(const Map& data, zmq_msg_t &msg)
{
string s;
int rc = serialize(data, s);
if (rc == 0) {
rc = zmq_msg_init_size(&msg, s.size());
}
if (rc == 0) {
strncpy((char *)zmq_msg_data(&msg), s.c_str(), s.size());
}
return rc;
}
template <typename Map>
int
zmsg_to_map(zmq_msg_t &msg, Map& data)
{
string s((const char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
return deserialize(s, data);
}
template<typename DT>
int
zmq_read_part(void *sock, int flag, int &more, DT &data)
{
zmq_msg_t msg;
more = 0;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, sock, flag);
if (rc == 1) {
char control_character = *(char*)zmq_msg_data(&msg);
if (control_character == 0x01 || control_character == 0x00) {
SWSS_LOG_INFO("Received subscription/unsubscription message when XSUB connect to XPUB: %c", control_character);
} else {
SWSS_LOG_DEBUG("Received non subscription based control character: %c", control_character);
}
rc = 0;
} else if (rc != -1) {
size_t more_size = sizeof (more);
zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &more_size);
rc = zmsg_to_map(msg, data);
RET_ON_ERR(rc == 0, "Failed to deserialize part rc=%d", rc);
/* read more flag if message read fails to de-serialize */
} else {
/* override with zmq err */
rc = zmq_errno();
if (rc != 11) {
SWSS_LOG_INFO("Failure to read part rc=%d", rc);
}
}
out:
zmq_msg_close(&msg);
return rc;
}
template<typename DT>
int
zmq_send_part(void *sock, int flag, const DT &data)
{
zmq_msg_t msg;
int rc = map_to_zmsg(data, msg);
RET_ON_ERR(rc == 0, "Failed to map to zmsg %d", rc);
rc = zmq_msg_send (&msg, sock, flag);
if (rc == -1) {
/* override with zmq err */
rc = zmq_errno();
RET_ON_ERR(false, "Failed to send part %d", rc);
}
/* zmq_msg_send returns count of bytes sent */
rc = 0;
out:
zmq_msg_close(&msg);
return rc;
}
template<typename P1, typename P2>
int
zmq_message_send(void *sock, const P1 &pt1, const P2 &pt2)
{
int rc = zmq_send_part(sock, pt2.empty() ? 0 : ZMQ_SNDMORE, pt1);
/* send second part, only if first is sent successfully */
if ((rc == 0) && (!pt2.empty())) {
rc = zmq_send_part(sock, 0, pt2);
}
return rc;
}
template<typename P1, typename P2>
int
zmq_message_read(void *sock, int flag, P1 &pt1, P2 &pt2)
{
int more = 0, rc, rc2 = 0;
rc = zmq_read_part(sock, flag, more, pt1);
if (more) {
/*
* read second part if more is set, irrespective
* of any failure. More is set, only if sock is valid.
*/
rc2 = zmq_read_part(sock, 0, more, pt2);
}
RET_ON_ERR((rc == 0) || (rc == 11), "Failure to read part1 rc=%d", rc);
if (rc2 != 0) {
rc = rc2;
RET_ON_ERR(false, "Failed to read part2 rc=%d", rc);
}
if (more) {
rc = -1;
RET_ON_ERR(false, "Don't expect more than 2 parts, rc=%d", rc);
}
out:
return rc;
}
};
template <typename Map>
int
serialize(const Map& data, string &s)
{
auto render = boost::serialization::singleton<serialization>::get_const_instance();
return render.serialize(data, s);
}
template <typename Map>
int
deserialize(const string& s, Map& data)
{
auto render = boost::serialization::singleton<serialization>::get_const_instance();
return render.deserialize(s, data);
}
template<typename P1, typename P2>
int
zmq_message_send(void *sock, const P1 &pt1, const P2 &pt2)
{
auto render = boost::serialization::singleton<serialization>::get_const_instance();
return render.zmq_message_send(sock, pt1, pt2);
}
template<typename P1, typename P2>
int
zmq_message_read(void *sock, int flag, P1 &pt1, P2 &pt2)
{
auto render = boost::serialization::singleton<serialization>::get_const_instance();
return render.zmq_message_read(sock, flag, pt1, pt2);
}
/* Convert {<key>: < params >tttt a JSON string */
string convert_to_json(const string key, const map_str_str_t ¶ms);
/* Parse JSON string into {<key>: < params >} */
int convert_from_json(const string json_str, string &key, map_str_str_t ¶ms);
/*
* Cache drain timeout.
*
* When subscriber's de-init is called, it calls start cache service.
* When subscriber init is called, it calls cache stop service.
*
* In either scenario, an entity stops reading and let other start.
* The entity that stops may have to read little longer to drain any
* events in local ZMQ cache.
*
* This timeout helps with that.
*
* In case of subscriber de-init, the events read during this period
* is given to cache as start-up or initial stock.
* In case of init where cache service reads for this period, gives
* those as part of cache read and subscriber service will be diligent
* about reading the same event from the channel, hence duplicate
* for next one second.
*/
#define CACHE_DRAIN_IN_MILLISECS 1000
#endif /* !_EVENTS_COMMON_H */