in zookeeper-client/zookeeper-client-c/src/zookeeper.c [2509:2714]
int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
struct timeval *tv)
{
int sock_flags;
int rc = 0;
struct timeval now;
#ifdef SOCK_CLOEXEC_ENABLED
sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
#else
sock_flags = SOCK_STREAM;
#endif
if(zh==0 || fd==0 ||interest==0 || tv==0)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
get_system_time(&now);
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
int time_left = calculate_interval(&zh->next_deadline, &now);
int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
(zh->recv_timeout / 10);
if (time_left > max_exceed)
LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
}
api_prolog(zh);
rc = update_addrs(zh, &now);
if (rc != ZOK) {
return api_epilog(zh, rc);
}
*fd = zh->fd->sock;
*interest = 0;
tv->tv_sec = 0;
tv->tv_usec = 0;
if (*fd == -1) {
/*
* If we previously failed to connect to server pool (zh->delay == 1)
* then we need delay our connection on this iteration 1/60 of the
* recv timeout before trying again so we don't spin.
*
* We always clear the delay setting. If we fail again, we'll set delay
* again and on the next iteration we'll do the same.
*
* We will also delay if the disable_reconnection_attempt is set.
*/
if (zh->delay == 1 || zh->disable_reconnection_attempt == 1) {
*tv = get_timeval(zh->recv_timeout/60);
zh->delay = 0;
lock_reconfig(zh);
LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]", zh->hostname);
unlock_reconfig(zh);
} else {
if (addr_rw_server) {
zh->addr_cur = *addr_rw_server;
addr_rw_server = 0;
} else {
// No need to delay -- grab the next server and attempt connection
zoo_cycle_next_server(zh);
}
zh->fd->sock = socket(zh->addr_cur.ss_family, sock_flags, 0);
if (zh->fd->sock < 0) {
rc = handle_socket_error_msg(zh,
__LINE__,
__func__,
ZSYSTEMERROR,
"socket() call failed");
return api_epilog(zh, rc);
}
zookeeper_set_sock_nodelay(zh, zh->fd->sock);
zookeeper_set_sock_noblock(zh, zh->fd->sock);
rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd->sock);
if (rc == -1) {
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
// For SSL, we first go to ZOO_SSL_CONNECTING_STATE
if (zh->fd->cert != NULL)
zh->state = ZOO_SSL_CONNECTING_STATE;
else
zh->state = ZOO_CONNECTING_STATE;
} else {
rc = handle_socket_error_msg(zh,
__LINE__,
__func__,
ZCONNECTIONLOSS,
"connect() call failed");
return api_epilog(zh, rc);
}
} else {
#ifdef HAVE_OPENSSL_H
if (zh->fd->cert != NULL) {
// We do SSL_connect() here
if (init_ssl_for_handler(zh) != ZOK) {
return ZSSLCONNECTIONERROR;
}
}
#endif
rc = prime_connection(zh);
if (rc != 0) {
return api_epilog(zh,rc);
}
LOG_INFO(LOGCALLBACK(zh),
"Initiated connection to server %s",
format_endpoint_info(&zh->addr_cur));
}
*tv = get_timeval(zh->recv_timeout/3);
}
*fd = zh->fd->sock;
zh->last_recv = now;
zh->last_send = now;
zh->last_ping = now;
zh->last_ping_rw = now;
zh->ping_rw_timeout = MIN_RW_TIMEOUT;
}
if (zh->fd->sock != -1) {
int idle_recv = calculate_interval(&zh->last_recv, &now);
int idle_send = calculate_interval(&zh->last_send, &now);
int recv_to = zh->recv_timeout*2/3 - idle_recv;
int send_to = zh->recv_timeout/3;
// have we exceeded the receive timeout threshold?
if (recv_to <= 0 && zh->state != ZOO_SSL_CONNECTING_STATE) {
// We gotta cut our losses and connect to someone else
#ifdef _WIN32
errno = WSAETIMEDOUT;
#else
errno = ETIMEDOUT;
#endif
*interest=0;
*tv = get_timeval(0);
return api_epilog(zh,handle_socket_error_msg(zh,
__LINE__, __func__, ZOPERATIONTIMEOUT,
"connection to %s timed out (exceeded timeout by %dms)",
format_endpoint_info(&zh->addr_cur),
-recv_to));
}
// We only allow 1/3 of our timeout time to expire before sending
// a PING
if (is_connected(zh)) {
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0) {
if (zh->sent_requests.head == 0) {
rc = send_ping(zh);
if (rc < 0) {
LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
return api_epilog(zh,rc);
}
}
send_to = zh->recv_timeout/3;
}
}
// If we are in read-only mode, seek for read/write server
if (zh->state == ZOO_READONLY_STATE) {
int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
if (idle_ping_rw >= zh->ping_rw_timeout) {
zh->last_ping_rw = now;
idle_ping_rw = 0;
zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
MAX_RW_TIMEOUT);
if (ping_rw_server(zh)) {
struct sockaddr_storage addr;
addrvec_peek(&zh->addrs, &addr);
zh->ping_rw_timeout = MIN_RW_TIMEOUT;
LOG_INFO(LOGCALLBACK(zh),
"r/w server found at %s",
format_endpoint_info(&addr));
cleanup(zh, ZOK);
} else {
addrvec_next(&zh->addrs, NULL);
}
}
send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
}
// choose the lesser value as the timeout
*tv = get_timeval(min(recv_to, send_to));
zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
if (zh->next_deadline.tv_usec > 1000000) {
zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
}
*interest = ZOOKEEPER_READ;
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
|| zh->state == ZOO_CONNECTING_STATE
|| zh->state == ZOO_SSL_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
}
}
return api_epilog(zh,ZOK);
}