in zookeeper-client/zookeeper-client-c/src/zookeeper.c [3391:3528]
int zookeeper_process(zhandle_t *zh, int events)
{
buffer_list_t *bptr;
int rc;
if (zh==NULL)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
api_prolog(zh);
IF_DEBUG(checkResponseLatency(zh));
rc = check_events(zh, events);
if (rc!=ZOK)
return api_epilog(zh, rc);
IF_DEBUG(isSocketReadable(zh));
while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
struct ReplyHeader hdr;
struct iarchive *ia = create_buffer_iarchive(
bptr->buffer, bptr->curr_offset);
deserialize_ReplyHeader(ia, "hdr", &hdr);
if (hdr.xid == PING_XID) {
// Ping replies can arrive out-of-order
int elapsed = 0;
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh->last_ping, &now);
LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
free_buffer(bptr);
} else if (hdr.xid == WATCHER_EVENT_XID) {
struct WatcherEvent evt;
int type = 0;
char *path = NULL;
completion_list_t *c = NULL;
LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");
deserialize_WatcherEvent(ia, "event", &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
lock_watchers(zh);
c->c.watcher_result = collectWatchers(zh, type, path);
unlock_watchers(zh);
// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
queue_completion(&zh->completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
free_buffer(bptr);
} else if (hdr.xid == AUTH_XID){
LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");
/* special handling for the AUTH response as it may come back
* out-of-band */
auth_completion_func(hdr.err,zh);
free_buffer(bptr);
/* authentication completion may change the connection state to
* unrecoverable */
if(is_unrecoverable(zh)){
handle_error(zh, ZAUTHFAILED);
close_buffer_iarchive(&ia);
return api_epilog(zh, ZAUTHFAILED);
}
} else {
int rc = hdr.err;
/* Find the request corresponding to the response */
completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh->close_requested == 1 && cptr == NULL) {
LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
close_buffer_iarchive(&ia);
free_buffer(bptr);
return api_epilog(zh,ZINVALIDSTATE);
}
assert(cptr);
/* The requests are going to come back in order */
if (cptr->xid != hdr.xid) {
LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");
// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
free_buffer(bptr);
// put the completion back on the queue (so it gets properly
// signaled and deallocated) and disconnect from the server
queue_completion(&zh->sent_requests,cptr,1);
return api_epilog(zh,
handle_socket_error_msg(zh, __LINE__, __func__, ZRUNTIMEINCONSISTENCY,
"unexpected server response: expected %#x, but received %#x",
hdr.xid,cptr->xid));
}
if (hdr.zxid > 0) {
// Update last_zxid only when it is a request response
zh->last_zxid = hdr.zxid;
}
lock_watchers(zh);
activateWatcher(zh, cptr->watcher, rc);
deactivateWatcher(zh, cptr->watcher_deregistration, rc);
unlock_watchers(zh);
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
} else {
#ifdef THREADED
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
process_sync_completion(zh, cptr, sc, ia);
notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
#else
abort_singlethreaded(zh);
#endif
}
}
close_buffer_iarchive(&ia);
}
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return api_epilog(zh, ZOK);
}