common/events_service.cpp (162 lines of code) (raw):
#include "events_service.h"
/*
* Cache management
*
* 1)` Caller expected to call init first, which initiates the connection
* to the capture end point. Being async, it would take some milliseconds
* to connect.
*
* 2) Caller starts the cache, optionally with some local cache it may have.
* The cache service keeps it as its startup/initial stock.
* This helps the caller saves his local cache with cache service.
*
* 3) Caller call stops, upon it making connect to XPUB end.
* As caller's connect is async and also this zmq end may have some cache
* of events by ZMQ locally. So read events little longer.
*
* 4) Upon stop, the caller may read cached events.
* The events are provided in FIFO order.
* As cached events can be too many, the service returns a few at a time.
* The caller is expected to read repeatedly until no event is returned.
*
* Cache overflow:
* A ceil is set and may run out of memory, before ceil is reached.
* In either case, the caching is *not* completely stopped but cached as
* one event per runtime-id/publishing instance. This info is required
* to compute missed message count due to overflow and otherwise.
*/
int
event_service::init_client(void *zmq_ctx, int block_ms)
{
int rc = -1;
void *sock = zmq_socket (zmq_ctx, ZMQ_REQ);
RET_ON_ERR(sock != NULL, "Failed to get ZMQ_REQ socket rc=%d", rc);
rc = zmq_setsockopt (sock, ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
RET_ON_ERR(rc == 0, "Failed to ZMQ_LINGER to %d", LINGER_TIMEOUT);
rc = zmq_connect (sock, get_config(REQ_REP_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Failed to connect to %s", get_config(REQ_REP_END_KEY).c_str());
// Set read timeout.
//
rc = zmq_setsockopt (sock, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms));
RET_ON_ERR(rc == 0, "Failed to ZMQ_RCVTIMEO to %d", block_ms);
m_socket = sock;
sock = NULL;
out:
if (sock != NULL) {
zmq_close(sock);
}
return rc;
}
int
event_service::init_server(void *zmq_ctx, int block_ms)
{
int rc = -1;
void *sock = zmq_socket (zmq_ctx, ZMQ_REP);
RET_ON_ERR(sock != NULL, "Failed to get ZMQ_REP socket rc=%d", rc);
rc = zmq_setsockopt (sock, ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
RET_ON_ERR(rc == 0, "Failed to ZMQ_LINGER to %d", LINGER_TIMEOUT);
rc = zmq_bind (sock, get_config(REQ_REP_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Failed to bind to %s", get_config(REQ_REP_END_KEY).c_str());
// Set read timeout.
//
rc = zmq_setsockopt (sock, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms));
RET_ON_ERR(rc == 0, "Failed to ZMQ_RCVTIMEO to %d", block_ms);
m_socket = sock;
sock = NULL;
out:
if (sock != NULL) {
zmq_close(sock);
}
return rc;
}
int
event_service::echo_send(const string s)
{
event_serialized_lst_t l = { s };
return channel_write(EVENT_ECHO, l);
}
int
event_service::echo_receive(string &outs)
{
event_serialized_lst_t l;
int code;
int rc = channel_read(code, l);
RET_ON_ERR(rc == 0, "failing to read echo rc=%d", rc);
RET_ON_ERR (code == 0, "echo receive resp %d not 0", code);
RET_ON_ERR (l.size() == 1, "echo received resp size %d is not 1",
(int)l.size());
outs = l[0];
out:
return rc;
}
int
event_service::cache_init()
{
int rc = send_recv(EVENT_CACHE_INIT);
if (rc == 0) {
/* To shadow subscribe connect required for cache init */
send_recv(EVENT_ECHO);
}
return rc;
}
int
event_service::cache_start(const event_serialized_lst_t &lst)
{
int rc;
RET_ON_ERR((rc = send_recv(EVENT_CACHE_START, &lst)) == 0,
"Failed to send cache start rc=%d", rc);
out:
return rc;
}
int
event_service::cache_stop()
{
int rc;
RET_ON_ERR((rc = send_recv(EVENT_CACHE_STOP)) == 0,
"Failed to send cache stop rc=%d", rc);
out:
return rc;
}
int
event_service::cache_read(event_serialized_lst_t &lst)
{
int rc;
RET_ON_ERR((rc = send_recv(EVENT_CACHE_READ, NULL, &lst)) == 0,
"Failed to send cache read rc=%d", rc);
out:
return rc;
}
int
event_service::global_options_set(const char *val)
{
int rc;
event_serialized_lst_t lst;
lst.push_back(string(val));
RET_ON_ERR((rc = send_recv(EVENT_OPTIONS, &lst, NULL)) == 0,
"Failed to send global options request rc=%d", rc);
out:
return rc;
}
int
event_service::global_options_get(char *val, int sz)
{
int ret = -1, rc;
string s;
event_serialized_lst_t lst;
RET_ON_ERR((rc = send_recv(EVENT_OPTIONS, NULL, &lst)) == 0,
"Failed to receive global options request rc=%d", rc);
if (!lst.empty()) {
s = *lst.begin();
}
strncpy(val, s.c_str(), sz);
val[sz - 1] = 0;
ret = (int)s.size();
out:
return ret;
}
int
event_service::channel_read(int &code, event_serialized_lst_t &data)
{
event_serialized_lst_t().swap(data);
return zmq_message_read(m_socket, 0, code, data);
}
int
event_service::channel_write(int code, const event_serialized_lst_t &data)
{
return zmq_message_send(m_socket, code, data);
}
int
event_service::send_recv(int code, const event_serialized_lst_t *lst_in,
event_serialized_lst_t *lst_out)
{
event_serialized_lst_t l;
int resp;
if(lst_in == NULL) {
lst_in = &l;
}
int rc = channel_write(code, *lst_in);
RET_ON_ERR(rc == 0, "failing to send code=%d", code);
if (lst_out == NULL) {
lst_out = &l;
}
rc = channel_read(resp, *lst_out);
RET_ON_ERR(rc == 0, "failing to read resp for code=%d", code);
rc = resp;
RET_ON_ERR (rc == 0, "receive resp %d not 0 for code=%d", resp, code);
out:
return rc;
}
void
event_service::close_service()
{
if (m_socket != NULL) { zmq_close(m_socket); m_socket = NULL; }
}