int zookeeper_process()

in zookeeper-client/zookeeper-client-c/src/zookeeper.c [3391:3528]


int zookeeper_process(zhandle_t *zh, int events)
{
    buffer_list_t *bptr;
    int rc;

    if (zh==NULL)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    api_prolog(zh);
    IF_DEBUG(checkResponseLatency(zh));
    rc = check_events(zh, events);
    if (rc!=ZOK)
        return api_epilog(zh, rc);

    IF_DEBUG(isSocketReadable(zh));

    while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
        struct ReplyHeader hdr;
        struct iarchive *ia = create_buffer_iarchive(
                                    bptr->buffer, bptr->curr_offset);
        deserialize_ReplyHeader(ia, "hdr", &hdr);

        if (hdr.xid == PING_XID) {
            // Ping replies can arrive out-of-order
            int elapsed = 0;
            struct timeval now;
            gettimeofday(&now, 0);
            elapsed = calculate_interval(&zh->last_ping, &now);
            LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
            free_buffer(bptr);
        } else if (hdr.xid == WATCHER_EVENT_XID) {
            struct WatcherEvent evt;
            int type = 0;
            char *path = NULL;
            completion_list_t *c = NULL;

            LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");

            deserialize_WatcherEvent(ia, "event", &evt);
            type = evt.type;
            path = evt.path;
            /* We are doing a notification, so there is no pending request */
            c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
            c->buffer = bptr;
            lock_watchers(zh);
            c->c.watcher_result = collectWatchers(zh, type, path);
            unlock_watchers(zh);

            // We cannot free until now, otherwise path will become invalid
            deallocate_WatcherEvent(&evt);
            queue_completion(&zh->completions_to_process, c, 0);
        } else if (hdr.xid == SET_WATCHES_XID) {
            LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
            free_buffer(bptr);
        } else if (hdr.xid == AUTH_XID){
            LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");

            /* special handling for the AUTH response as it may come back
             * out-of-band */
            auth_completion_func(hdr.err,zh);
            free_buffer(bptr);
            /* authentication completion may change the connection state to
             * unrecoverable */
            if(is_unrecoverable(zh)){
                handle_error(zh, ZAUTHFAILED);
                close_buffer_iarchive(&ia);
                return api_epilog(zh, ZAUTHFAILED);
            }
        } else {
            int rc = hdr.err;
            /* Find the request corresponding to the response */
            completion_list_t *cptr = dequeue_completion(&zh->sent_requests);

            /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
            if (zh->close_requested == 1 && cptr == NULL) {
                LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                return api_epilog(zh,ZINVALIDSTATE);
            }
            assert(cptr);
            /* The requests are going to come back in order */
            if (cptr->xid != hdr.xid) {
                LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");

                // received unexpected (or out-of-order) response
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                // put the completion back on the queue (so it gets properly
                // signaled and deallocated) and disconnect from the server
                queue_completion(&zh->sent_requests,cptr,1);
                return api_epilog(zh,
                                  handle_socket_error_msg(zh, __LINE__, __func__, ZRUNTIMEINCONSISTENCY,
                                  "unexpected server response: expected %#x, but received %#x",
                                  hdr.xid,cptr->xid));
            }

            if (hdr.zxid > 0) {
                // Update last_zxid only when it is a request response
                zh->last_zxid = hdr.zxid;
            }
            lock_watchers(zh);
            activateWatcher(zh, cptr->watcher, rc);
            deactivateWatcher(zh, cptr->watcher_deregistration, rc);
            unlock_watchers(zh);

            if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
                LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
                cptr->buffer = bptr;
                queue_completion(&zh->completions_to_process, cptr, 0);
            } else {
#ifdef THREADED
                struct sync_completion
                        *sc = (struct sync_completion*)cptr->data;
                sc->rc = rc;

                process_sync_completion(zh, cptr, sc, ia);

                notify_sync_completion(sc);
                free_buffer(bptr);
                zh->outstanding_sync--;
                destroy_completion_entry(cptr);
#else
                abort_singlethreaded(zh);
#endif
            }
        }

        close_buffer_iarchive(&ia);

    }
    if (process_async(zh->outstanding_sync)) {
        process_completions(zh);
    }

    return api_epilog(zh, ZOK);
}