common/events.h (28 lines of code) (raw):

#ifndef _EVENTS_H #define _EVENTS_H #include <string> #include <vector> #include <map> /* * Events library APIs. * * APIs are for publishing & receiving events with source, tag and params along with timestamp. * Used by event publishers and those interested in receiving published events. * Publishers are multiple run from different contexts, as processes running in hosts & containers. * Receiver are often few. Telmetry container runs a receiver. * */ /* Handle for a publisher / subscriber instance */ typedef void* event_handle_t; /* * Initialize an event publisher instance for an event source. * * A single publisher instance is maintained for a source. * Any duplicate init call for a source will return the same instance. * * NOTE: * The initialization occurs asynchronously. * Any event published before init is complete, is blocked until the init. * is complete. Hence recommend, do the init as soon as the process starts. * * Input: * event_source: * The YANG module name for the event source. All events published with the handle * returned by this call is tagged with this source, transparently. The receiver * could subscribe with this source as filter. * * Return * Non NULL handle * NULL on failure */ event_handle_t events_init_publisher(const std::string event_source); /* * De-init/free the publisher * * Input: * Handle returned from events_init_publisher. * * Output: * Handle is nullified. */ void events_deinit_publisher(event_handle_t handle); /* * List of event params */ typedef std::map<std::string, std::string> event_params_t; /* * timestamp param name */ #define EVENT_TS_PARAM "timestamp" /* * Publish an event * * Internally a globally unique sequence number is embedded in every published event, * The sequence numbers from same publishing instances can be compared * to see if there any missing events between. * * The sequence has two components as run-time-id that distinguishes * the running instance of a publisher and other a running sequence * starting from 0, which is local to this runtime-id. * * The receiver API keep last received number for each runtime id * and use this info to compute missed event count upon next event. * * input: * handle - As obtained from events_init_publisher for a event-source. * * event_tag - * Name of the YANG container that defines this event in the * event-source module associated with this handle. * * YANG path formatted as "< event_source >:< event_tag >" * e.g. {"sonic-events-bgp:bgp-state": { "ip": "10.10.10.10", ...}} * * params - * Params associated with event; This may or may not contain * timestamp. In the absence, the timestamp is added, transparently. * The timestamp should be per rfc3339 * e.g. "2022-08-17T02:39:21.286611Z" * * return: * 0 - On success * > 0 - On failure, returns zmq_errno, if failure is zmq socket related. * < 0 - For all other failures */ /* * A sanity check on final JSON string size of event * An error log will be written for too big events for alert. */ #define EVENT_MAXSZ 1024 int event_publish(event_handle_t handle, const std::string event_tag, const event_params_t *params=NULL); /* * Initialize subscriber. * Init subscriber, optionally to filter by event-source. * * Input: * use_cache * When set to true, it will make use of the cache service transparently. * The cache service caches events during session down time. The deinit * start the caching and init call stops the caching. * default: false * * recv_timeout * Read blocks by default until an event is available for read. * 0 - Returns immediately, with or w/o event * -1 - Default; Blocks until an event is available for read * N - Count ofd milliseconds to wait for an event. * * * lst_subscribe_sources_t * List of subscription sources of interest. * The source value is the corresponding YANG module name. * e.g. "sonic-events-bgp " is the source modulr name for bgp. * default: All sources, if none provided. * * Return: * Non NULL handle on success * NULL on failure */ typedef std::vector<std::string> event_subscribe_sources_t; event_handle_t events_init_subscriber(bool use_cache=false, int recv_timeout = -1, const event_subscribe_sources_t *sources=NULL); /* * De-init/free the subscriber * * Input: * Handle returned from events_init_subscriber * * Output: * Handle is nullified. */ void events_deinit_subscriber(event_handle_t handle); typedef struct { std::string key; /* key */ event_params_t params; /* Params received */ uint32_t missed_cnt; /* missed count */ int64_t publish_epoch_ms; /* Epoch time in milliseconds */ } event_receive_op_t; /* * Receive an event. * A blocking call unless the subscriber is created with a timeout. * * This API maintains an expected sequence number and use the received * sequence in event to compute missed events count. The missed count * provides the count of events missed from this sender. * * input: * handle - As obtained from events_init_subscriber * * output: * evt : * The entire received event. * * return: * 0 - On success * > 0 - Implies failure due to timeout. * < 0 - For all other failures * */ int event_receive(event_handle_t handle, event_receive_op_t &evt); /* To receive as JSON */ int event_receive_json(event_handle_t handle, std::string &evt, uint32_t &missed_cnt, int64_t &publish_epoch_ms); #endif /* !_EVENTS_H */