common/events_service.h (37 lines of code) (raw):
#ifndef _EVENTS_SERVICE_H
#define _EVENTS_SERVICE_H
#include "string.h"
#include "events_common.h"
/*
* The eventd runs an event service that supports caching & miscellaneous
* as required by events API.
*
* This header lists the services provided,
*
* These services are only used by events API internally.
*
* All the services uses REQ/REP pattern between caller
* centralized event service. The return code is provided
* by service in its reply.
*
* event_request goes as single part message carrying event_req_t,
* unless there is data associated.
*
* Both req & resp are either single or two part. In case of no
* data associated, it is just request code or response return value in one part.
* In case of data, it comes in seccond part.
*
* Type of data is one or multiple strings, which is sent as serialized vector
* of strings. event_serialized_lst_t
*
* In case of echo, part2 is the vector of single string as provided in the request.
*
* The cache-read returns cached events as vector of serialized strings.
* NOTE: Cache only saves part 2 of the published event.
*/
/* List of request codes for supported commands */
typedef enum {
EVENT_CACHE_INIT, /* Init the cache before start */
EVENT_CACHE_START, /* Start caching all published events */
EVENT_CACHE_STOP, /* Stop the cache */
EVENT_CACHE_READ, /* Read cached events */
EVENT_ECHO, /* Echoes the received data in request via response */
EVENT_OPTIONS, /* global options Set/Get */
EVENT_EXIT /* Exit the eventd service -- Useful for unit test.*/
} event_req_type_t;
/*
* internal service init & APIs for read & write
*/
/*
* An internal service is provided for cache handling & miscellaneous.
*
* Clients initialize for init_client and uses the provided services
* server code init for server and use read & write APIs
*/
class event_service {
public:
event_service(): m_socket(NULL) {}
~event_service() { close_service(); }
/*
* Init the service for client or server.
* The client uses REQ socket & connects.
* The server uses REP socket & bind.
*
* Block helps setting timeout for any read.
* Publishing clients try ECHO service send/recv to help shadow
* its async connection to XPUB end point, but the eventd service
* could be down. So having a timeout, helps which will timeout
* recv.
* Publish clients that choose to block may specify the duration
*
* Input:
* zmq_ctx - Context to use
* block_ms
* 0 - Return immediately
* <N>- Count of millisecs to wait for a message.
* -1 - Block until a message is available or any fatal failure.
*
* return:
* 0 - On success
* -1 - On failure. zerrno is set to EAGAIN, if timed out.
* Else appropriate error code is set as per zmq_msg_recv.
*/
int init_client(void *zmq_ctx, int block_ms = -1);
int init_server(void *zmq_ctx, int block_ms = -1);
/*
* Event cache service is singleton service
*
* Usage:
* Init - Initiates the connection
* start - Start reading & caching
* stop - Stop reading & disconnect.
* read - Read the cached events.
*
* Any duplicate start has no impact.
* The cached events can be read only upon stop. A read before
* stop returns failure. A read w/o a start succeeds with no message.
*
*/
/*
* Called to init caching events
*
* This is transparently called by events_deinit_subscriber, if cache service
* was enabled. This simply triggers a connect request and does not start
* reading yet.
* NOTE: ZMQ connects asynchronously.
*
* return:
* 0 - On success.
* 1 - Already running
* -1 - On failure.
*/
int cache_init();
/*
* Called to start caching events
*
* This is transparently called by events_deinit_subscriber after init.
* The deinit call may provide events in its local cache.
* The caching service uses this as initial/startup stock.
*
* input:
* lst - i/p events from caller's cache.
*
* return:
* 0 - On success.
* 1 - Already running
* -1 - On failure.
*/
int cache_start(const event_serialized_lst_t &lst);
/*
* Called to stop caching events
*
* This is transparently called by events_init_subscriber, if cache service
* is enabled.
*
* return:
* 0 - On success.
* 1 - Not running
* -1 - On failure.
*/
int cache_stop();
/*
* cache read
*
* This is transparently called by event_receive, if cache service
* is enabled.
* Each event is received as 2 parts. First part is more a filter for
* hence dropped. The second part is returned as string events_data_type_t.
* The string is the serialized form of internal_event_t.
*
* An empty o/p implies no more.
*
* Internal details:
*
* Cache service caches all events until buffer overflow or max-ceil.
*
* Upon overflow, it creates a separate cache, where it keeps only
* the last event received per runtime ID.
* This is required for receiver to be able to get the missed count.
*
* The receiver API will compute the missed in the same way for
* events read from subscription channel & as well from cache.
*
* output:
* lst - A set of events, with a max cap.
* Hence multiple reads may be required to read all.
* An empty list implies no more event in cache.
*
* return:
* 0 - On success.
* -1 - On failure.
*/
int cache_read(event_serialized_lst_t &lst);
/*
* Echo send service.
*
* A service to just test out connectivity. The publisher uses
* it to shadow its connection to zmq proxy's XSUB end. This is
* called transparently by events_init_publisher.
*
* Input:
* s - string to echo.
*
* return:
* 0 - On success
* -1 - On failure.
*/
int echo_send(const string s);
/*
* Echo receive service.
*
* Receives the las sent echo.
* This is transparently called by event_publish on first
* publish call. This helps ensure connect XSUB end is *most* likely
* (99.9%) established, as write & read back takes one full loop.
*
* The service is closed upon read as echo service is one shot only.
*
* output:
* s - echoed string
*
* return:
* 0 - On success
* -1 - On failure
*/
int echo_receive(string &s);
/*
* Global options request set
*
* Input:
* val -- Put the interval for set
*
* Return:
* 0 - On Success
* -1 - On Failure
*/
int global_options_set(const char *val);
/*
* Global options request get.
*
* Input:
* val_sz -- Size of val buffer
*
* Output:
* val -- Get the current val
*
* Return:
* > 0 - Count of bytes to copied/to-be-copied.
* Result is truncated if given size <= this value.
* But copied string is *always* null termninated.
*
* -1 - On Failure
*/
int global_options_get(char *val, int val_sz);
/*
* The read for req/resp from client/server. The APIs above use this
* to read response and the server use this to read request.
*
* Input: None
*
* Output:
* code - It is event_req_type_t for request or return code
* for response
* data - Data read, if any
*
* return:
* 0 - On success
* -1 - On failure
*/
int channel_read(int &code, event_serialized_lst_t &data);
/*
* The under lying write for req/resp from client/server
*
* Input:
* code - It is event_req_type_t for request or return code
* for response
* data - If any data to be sent along.
*
* Output: None
*
* return:
* 0 - On success
* -1 - On failure
*/
int channel_write(int code, const event_serialized_lst_t &data);
/*
* send and receive helper.
* Writes given code & data and reads back data into
* provided event_serialized_lst_t arg and response read is
* returned.
*
* input:
* code - Request code
* lst - Data to send
*
* output:
* lst - Data read, if any
*
* return:
* Any failure or response code from server.
*/
int send_recv(int code, const event_serialized_lst_t *lst_in = NULL,
event_serialized_lst_t *lst_out = NULL);
/*
* de-init/close service
*/
void close_service();
/*
* Check if service is active
*/
bool is_active() { return m_socket != NULL; };
private:
void *m_socket;
};
#endif // _EVENTS_SERVICE_H