int zookeeper_interest()

in zookeeper-client/zookeeper-client-c/src/zookeeper.c [2509:2714]


int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
     struct timeval *tv)
{
    int sock_flags;
    int rc = 0;
    struct timeval now;

#ifdef SOCK_CLOEXEC_ENABLED
    sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
#else
    sock_flags = SOCK_STREAM;
#endif

    if(zh==0 || fd==0 ||interest==0 || tv==0)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    get_system_time(&now);
    if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
        int time_left = calculate_interval(&zh->next_deadline, &now);
        int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
                         (zh->recv_timeout / 10);
        if (time_left > max_exceed)
            LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
    }
    api_prolog(zh);

    rc = update_addrs(zh, &now);
    if (rc != ZOK) {
        return api_epilog(zh, rc);
    }

    *fd = zh->fd->sock;
    *interest = 0;
    tv->tv_sec = 0;
    tv->tv_usec = 0;

    if (*fd == -1) {
        /*
         * If we previously failed to connect to server pool (zh->delay == 1)
         * then we need delay our connection on this iteration 1/60 of the
         * recv timeout before trying again so we don't spin.
         *
         * We always clear the delay setting. If we fail again, we'll set delay
         * again and on the next iteration we'll do the same.
         *
         * We will also delay if the disable_reconnection_attempt is set.
         */
        if (zh->delay == 1 || zh->disable_reconnection_attempt == 1) {
            *tv = get_timeval(zh->recv_timeout/60);
            zh->delay = 0;

            lock_reconfig(zh);
            LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]", zh->hostname);
            unlock_reconfig(zh);
        } else {
            if (addr_rw_server) {
                zh->addr_cur = *addr_rw_server;
                addr_rw_server = 0;
            } else {
                // No need to delay -- grab the next server and attempt connection
                zoo_cycle_next_server(zh);
            }
            zh->fd->sock  = socket(zh->addr_cur.ss_family, sock_flags, 0);
            if (zh->fd->sock < 0) {
              rc = handle_socket_error_msg(zh,
                                           __LINE__,
                                           __func__,
                                           ZSYSTEMERROR,
                                           "socket() call failed");
              return api_epilog(zh, rc);
            }

            zookeeper_set_sock_nodelay(zh, zh->fd->sock);
            zookeeper_set_sock_noblock(zh, zh->fd->sock);

            rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd->sock);

            if (rc == -1) {
                /* we are handling the non-blocking connect according to
                 * the description in section 16.3 "Non-blocking connect"
                 * in UNIX Network Programming vol 1, 3rd edition */
                if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
                    // For SSL, we first go to ZOO_SSL_CONNECTING_STATE
                    if (zh->fd->cert != NULL)
                        zh->state = ZOO_SSL_CONNECTING_STATE;
                    else
                        zh->state = ZOO_CONNECTING_STATE;
                } else {
                    rc = handle_socket_error_msg(zh,
                                                 __LINE__,
                                                 __func__,
                                                 ZCONNECTIONLOSS,
                                                 "connect() call failed");
                    return api_epilog(zh, rc);
                }
            } else {
#ifdef HAVE_OPENSSL_H
                if (zh->fd->cert != NULL) {
                    // We do SSL_connect() here
                    if (init_ssl_for_handler(zh) != ZOK) {
                        return ZSSLCONNECTIONERROR;
                    }
                }
#endif
                rc = prime_connection(zh);
                if (rc != 0) {
                    return api_epilog(zh,rc);
                }

                LOG_INFO(LOGCALLBACK(zh),
                         "Initiated connection to server %s",
                         format_endpoint_info(&zh->addr_cur));
            }
            *tv = get_timeval(zh->recv_timeout/3);
        }
        *fd = zh->fd->sock;
        zh->last_recv = now;
        zh->last_send = now;
        zh->last_ping = now;
        zh->last_ping_rw = now;
        zh->ping_rw_timeout = MIN_RW_TIMEOUT;
    }

    if (zh->fd->sock != -1) {
        int idle_recv = calculate_interval(&zh->last_recv, &now);
        int idle_send = calculate_interval(&zh->last_send, &now);
        int recv_to = zh->recv_timeout*2/3 - idle_recv;
        int send_to = zh->recv_timeout/3;
        // have we exceeded the receive timeout threshold?
        if (recv_to <= 0 && zh->state != ZOO_SSL_CONNECTING_STATE) {
            // We gotta cut our losses and connect to someone else
#ifdef _WIN32
            errno = WSAETIMEDOUT;
#else
            errno = ETIMEDOUT;
#endif
            *interest=0;
            *tv = get_timeval(0);
            return api_epilog(zh,handle_socket_error_msg(zh,
                    __LINE__, __func__, ZOPERATIONTIMEOUT,
                    "connection to %s timed out (exceeded timeout by %dms)",
                    format_endpoint_info(&zh->addr_cur),
                    -recv_to));

        }

        // We only allow 1/3 of our timeout time to expire before sending
        // a PING
        if (is_connected(zh)) {
            send_to = zh->recv_timeout/3 - idle_send;
            if (send_to <= 0) {
                if (zh->sent_requests.head == 0) {
                    rc = send_ping(zh);
                    if (rc < 0) {
                        LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
                        return api_epilog(zh,rc);
                    }
                }
                send_to = zh->recv_timeout/3;
            }
        }

        // If we are in read-only mode, seek for read/write server
        if (zh->state == ZOO_READONLY_STATE) {
            int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
            if (idle_ping_rw >= zh->ping_rw_timeout) {
                zh->last_ping_rw = now;
                idle_ping_rw = 0;
                zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
                                          MAX_RW_TIMEOUT);
                if (ping_rw_server(zh)) {
                    struct sockaddr_storage addr;
                    addrvec_peek(&zh->addrs, &addr);
                    zh->ping_rw_timeout = MIN_RW_TIMEOUT;
                    LOG_INFO(LOGCALLBACK(zh),
                             "r/w server found at %s",
                             format_endpoint_info(&addr));
                    cleanup(zh, ZOK);
                } else {
                    addrvec_next(&zh->addrs, NULL);
                }
            }
            send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
        }

        // choose the lesser value as the timeout
        *tv = get_timeval(min(recv_to, send_to));

        zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
        zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
        if (zh->next_deadline.tv_usec > 1000000) {
            zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
            zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
        }
        *interest = ZOOKEEPER_READ;
        /* we are interested in a write if we are connected and have something
         * to send, or we are waiting for a connect to finish. */
        if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
            || zh->state == ZOO_CONNECTING_STATE
            || zh->state == ZOO_SSL_CONNECTING_STATE) {
            *interest |= ZOOKEEPER_WRITE;
        }
    }
    return api_epilog(zh,ZOK);
}