static int check_events()

in zookeeper-client/zookeeper-client-c/src/zookeeper.c [2945:3084]


static int check_events(zhandle_t *zh, int events)
{
    if (zh->fd->sock == -1)
        return ZINVALIDSTATE;

#ifdef HAVE_OPENSSL_H
    if ((events&ZOOKEEPER_WRITE) && (zh->state == ZOO_SSL_CONNECTING_STATE) && zh->fd->cert != NULL) {
        int rc, error;
        socklen_t len = sizeof(error);
        rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
        /* the description in section 16.4 "Non-blocking connect"
         * in UNIX Network Programming vol 1, 3rd edition, points out
         * that sometimes the error is in errno and sometimes in error */
        if (rc < 0 || error) {
            if (rc == 0)
                errno = error;
            return handle_socket_error_msg(zh, __LINE__, __func__, ZCONNECTIONLOSS,
                "server refused to accept the client");
        }
        // We do SSL_connect() here
        if (init_ssl_for_handler(zh) != ZOK) {
            return ZSSLCONNECTIONERROR;
        }
    }
#endif

    if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
        int rc, error;
        socklen_t len = sizeof(error);
        rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
        /* the description in section 16.4 "Non-blocking connect"
         * in UNIX Network Programming vol 1, 3rd edition, points out
         * that sometimes the error is in errno and sometimes in error */
        if (rc < 0 || error) {
            if (rc == 0)
                errno = error;
            return handle_socket_error_msg(zh, __LINE__, __func__, ZCONNECTIONLOSS,
                "server refused to accept the client");
        }

        if((rc=prime_connection(zh))!=0)
            return rc;

        LOG_INFO(LOGCALLBACK(zh), "initiated connection to server %s", format_endpoint_info(&zh->addr_cur));
        return ZOK;
    }

    if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
        /* make the flush call non-blocking by specifying a 0 timeout */
        int rc=flush_send_queue(zh,0);
        if (rc < 0)
            return handle_socket_error_msg(zh, __LINE__, __func__, ZCONNECTIONLOSS,
                "failed while flushing send queue");
    }
    if (events&ZOOKEEPER_READ) {
        int rc;
        if (zh->input_buffer == 0) {
            zh->input_buffer = allocate_buffer(0,0);
        }

        rc = recv_buffer(zh, zh->input_buffer);
        if (rc < 0) {
            return handle_socket_error_msg(zh, __LINE__, __func__, ZCONNECTIONLOSS,
                "failed while receiving a server response");
        }
        if (rc > 0) {
            get_system_time(&zh->last_recv);
            if (zh->input_buffer != &zh->primer_buffer) {
                if (is_connected(zh) || !is_sasl_auth_in_progress(zh)) {
                    queue_buffer(&zh->to_process, zh->input_buffer, 0);
#ifdef HAVE_CYRUS_SASL_H
                } else {
                    rc = process_sasl_response(zh, zh->input_buffer->buffer, zh->input_buffer->curr_offset);
                    free_buffer(zh->input_buffer);
                    zh->input_buffer = 0;
                    if (rc < 0) {
                        zoo_sasl_mark_failed(zh);
                        return rc;
                    } else if (zh->sasl_client->state == ZOO_SASL_COMPLETE) {
                        /*
                         * SASL authentication just completed; send
                         * watches, auth. info, etc. now.
                         */
                        finalize_session_establishment(zh);
                    }
#endif /* HAVE_CYRUS_SASL_H */
                }
            } else  {
                int64_t oldid, newid;
                //deserialize
                deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
                /* We are processing the primer_buffer, so we need to finish
                 * the connection handshake */
                oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
                zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
                newid = zh->primer_storage.sessionId;
                if (oldid != 0 && oldid != newid) {
                    zh->state = ZOO_EXPIRED_SESSION_STATE;
                    errno = ESTALE;
                    return handle_socket_error_msg(zh, __LINE__, __func__, ZSESSIONEXPIRED,
                            "sessionId=%#llx has expired.",oldid);
                } else {
                    zh->recv_timeout = zh->primer_storage.timeOut;
                    zh->client_id.client_id = newid;

                    memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                           sizeof(zh->client_id.passwd));

#ifdef HAVE_CYRUS_SASL_H
                    if (zh->sasl_client) {
                        /*
                         * Start a SASL authentication session.
                         * Watches, auth. info, etc. will be sent
                         * after it completes.
                         */
                        rc = zoo_sasl_connect(zh);
                        rc = rc < 0 ? rc : zoo_sasl_client_start(zh);
                        if (rc < 0) {
                            zoo_sasl_mark_failed(zh);
                            return rc;
                        }
                    } else {
                        /* Can send watches, auth. info, etc. immediately. */
                        finalize_session_establishment(zh);
                    }
#else /* HAVE_CYRUS_SASL_H */
                    /* Can send watches, auth. info, etc. immediately. */
                    finalize_session_establishment(zh);
#endif /* HAVE_CYRUS_SASL_H */
                }
            }
            zh->input_buffer = 0;
        } else {
            // zookeeper_process was called but there was nothing to read
            // from the socket
            return ZNOTHING;
        }
    }
    return ZOK;
}