in native/common/jk_lb_worker.c [1199:1696]
static int JK_METHOD service(jk_endpoint_t *e,
jk_ws_service_t *s,
jk_log_context_t *l, int *is_error)
{
lb_endpoint_t *p;
int attempt = 0;
lb_sub_worker_t *prec = NULL;
int num_of_workers;
int first = 1;
int was_forced = 0;
int recoverable = JK_TRUE;
int rc = JK_UNSET;
char *sessionid = NULL;
int i;
int retry = 0;
JK_TRACE_ENTER(l);
if (!e || !e->endpoint_private || !s || !is_error) {
JK_LOG_NULL_PARAMS(l);
if (is_error)
*is_error = JK_HTTP_SERVER_ERROR;
JK_TRACE_EXIT(l);
return JK_FALSE;
}
p = e->endpoint_private;
num_of_workers = p->worker->num_of_workers;
/* Set returned error to OK */
*is_error = JK_HTTP_OK;
if (p->worker->sequence < p->worker->s->h.sequence)
jk_lb_pull(p->worker, JK_FALSE, l);
for (i = 0; i < num_of_workers; i++) {
lb_sub_worker_t *rec;
ajp_worker_t *aw;
jk_log(l, JK_LOG_DEBUG, "LB - num_of_workers: %d, retry: %d, lb_retries: %d", num_of_workers, i, p->worker->lb_retries);
rec = &(p->worker->lb_workers[i]);
aw = (ajp_worker_t *)rec->worker->worker_private;
if (rec->s->state == JK_LB_STATE_BUSY) {
if ((aw->busy_limit <= 0 || aw->s->busy < aw->busy_limit) &&
ajp_has_endpoint(rec->worker, l)) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s busy state ended",
rec->name);
rec->s->state = JK_LB_STATE_OK;
}
}
/* Copy the shared state info */
p->states[i] = rec->s->state;
}
/* set the recovery post, for LB mode */
s->reco_buf = jk_b_new(s->pool);
if (!s->reco_buf) {
*is_error = JK_HTTP_SERVER_ERROR;
jk_log(l, JK_LOG_ERROR,
"Failed allocating AJP message");
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) {
*is_error = JK_HTTP_SERVER_ERROR;
jk_log(l, JK_LOG_ERROR,
"Failed allocating AJP message buffer of %d bytes.", p->worker->max_packet_size);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
jk_b_reset(s->reco_buf);
s->reco_status = RECO_INITED;
if (p->worker->sticky_session && s->extension.sticky_ignore != JK_TRUE) {
/* Use sessionid only if sticky_session is
* defined and not overwritten for this load balancer
*/
sessionid = get_sessionid(s, p->worker, l);
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"service sticky_session=%d id='%s'",
p->worker->sticky_session, sessionid ? sessionid : "empty");
while (recoverable == JK_TRUE) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "attempt %d, max attempts %d, worker count %d",
attempt, p->worker->lb_retries, num_of_workers);
if (attempt >= num_of_workers || attempt >= p->worker->lb_retries) {
retry++;
if (retry >= p->worker->retries) {
/* Done with retrying */
break;
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"retry %d, sleeping for %d ms before retrying",
retry, p->worker->retry_interval);
jk_sleep(p->worker->retry_interval);
/* Pull shared memory if something changed during sleep */
if (p->worker->sequence < p->worker->s->h.sequence)
jk_lb_pull(p->worker, JK_FALSE, l);
for (i = 0; i < num_of_workers; i++) {
/* Copy the shared state info */
p->states[i] = p->worker->lb_workers[i].s->state;
}
attempt = 0;
}
rc = JK_FALSE;
*is_error = JK_HTTP_SERVER_BUSY;
i = get_most_suitable_worker(s, p->worker, sessionid, p->states, l);
if (i >= 0) {
int r;
int is_service_error = JK_HTTP_OK;
lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
ajp_worker_t *aw = (ajp_worker_t *)rec->worker->worker_private;
jk_endpoint_t *end = NULL;
int activation = s->extension.activation ?
s->extension.activation[i] :
JK_LB_ACTIVATION_UNSET;
if (activation == JK_LB_ACTIVATION_UNSET)
activation = rec->activation;
if (!s->route)
s->route = rec->route;
s->activation = jk_lb_get_activation_direct(activation, l);
prec = rec;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"service worker=%s route=%s failover=%s",
rec->name, s->route, s->sticky ? "false" : "true");
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
if (rec->s->state == JK_LB_STATE_RECOVER) {
rec->s->state = JK_LB_STATE_PROBE;
p->states[rec->i] = JK_LB_STATE_PROBE;
}
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
r = rec->worker->get_endpoint(rec->worker, &end, l);
if (!r || !end) {
/* If we can not get the endpoint
* mark the worker as busy rather then
* as in error if the retry number is
* greater then the number of retries.
*/
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
if (rec->s->state != JK_LB_STATE_ERROR) {
rec->s->state = JK_LB_STATE_BUSY;
p->states[rec->i] = JK_LB_STATE_BUSY;
}
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
jk_log(l, JK_LOG_INFO,
"could not get free endpoint for worker %s (%d retries)",
rec->name, retry);
}
else {
int service_stat = JK_UNSET;
jk_uint64_t rd = 0;
jk_uint64_t wr = 0;
int busy;
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
/* Increment the number of workers serving request */
busy = JK_ATOMIC_INCREMENT(&(p->worker->s->busy));
if (busy > p->worker->s->max_busy)
p->worker->s->max_busy = busy;
if (p->worker->lbmethod == JK_LB_METHOD_REQUESTS ||
p->worker->lbmethod == JK_LB_METHOD_BUSYNESS ||
(!sessionid &&
s->extension.stateless != JK_TRUE &&
(p->worker->lbmethod == JK_LB_METHOD_SESSIONS ||
p->worker->lbmethod == JK_LB_METHOD_NEXT)))
rec->s->lb_value += rec->lb_mult;
if (!sessionid && s->extension.stateless != JK_TRUE) {
rec->s->sessions++;
}
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
if (!s->sticky && (s->extension.set_session_cookie || p->worker->set_session_cookie)) {
char **old_names = s->resp_headers_names;
char **old_values = s->resp_headers_values;
s->resp_headers_names = jk_pool_alloc(s->pool,
(s->num_resp_headers + 1) * sizeof(char *));
s->resp_headers_values = jk_pool_alloc(s->pool,
(s->num_resp_headers + 1) * sizeof(char *));
if (!s->resp_headers_names || !s->resp_headers_values) {
jk_log(l, JK_LOG_ERROR,
"Failed allocating %d new response headers.", s->num_resp_headers + 1);
s->resp_headers_names = old_names;
s->resp_headers_values = old_values;
} else if (s->num_resp_headers) {
memcpy(s->resp_headers_names, old_names, s->num_resp_headers * sizeof(char *));
memcpy(s->resp_headers_values, old_values, s->num_resp_headers * sizeof(char *));
}
s->resp_headers_names[s->num_resp_headers] = "Set-Cookie";
s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, p->worker->session_cookie,
"=.", rec->route,
NULL);
if (p->worker->session_cookie_path && *p->worker->session_cookie_path) {
s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, s->resp_headers_values[s->num_resp_headers],
";PATH=", p->worker->session_cookie_path,
NULL);
}
s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, s->resp_headers_values[s->num_resp_headers],
";HttpOnly",
(s->is_ssl ? ";Secure" : ""),
NULL);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "Added cookie header '%s' with value '%s' ",
s->resp_headers_names[s->num_resp_headers],
s->resp_headers_values[s->num_resp_headers]);
s->num_resp_headers++;
}
service_stat = end->service(end, s, l, &is_service_error);
rd = end->rd;
wr = end->wr;
recoverable = end->recoverable;
*is_error = is_service_error;
end->done(&end, l);
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_lock();
/* Update partial reads and writes if any */
if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) {
rec->s->lb_value += (rd+wr)*rec->lb_mult;
}
else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) {
if (rec->s->lb_value >= rec->lb_mult) {
rec->s->lb_value -= rec->lb_mult;
}
else {
rec->s->lb_value = 0;
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"worker %s has load value to low (%"
JK_UINT64_T_FMT
" < %"
JK_UINT64_T_FMT
") ",
"- correcting to 0",
rec->name,
rec->s->lb_value,
rec->lb_mult);
}
}
}
/* When have an endpoint and we ran a request, assume
* we are OK, unless we last were in error.
* We will below explicitely set OK or ERROR according
* to the returned service_stat.
*/
if (rec->s->state != JK_LB_STATE_ERROR) {
rec->s->state = JK_LB_STATE_OK;
p->states[rec->i] = JK_LB_STATE_OK;
}
/* Decrement the busy worker count.
* Check if the busy was reset to zero by graceful
* restart of the server.
*/
JK_ATOMIC_DECREMENT(&(p->worker->s->busy));
if (service_stat == JK_TRUE) {
/*
* Successful request.
*/
rec->s->state = JK_LB_STATE_OK;
p->states[rec->i] = JK_LB_STATE_OK;
rec->s->first_error_time = 0;
rec->s->last_error_time = 0;
rc = JK_TRUE;
recoverable = JK_UNSET;
}
else if (service_stat == JK_CLIENT_ERROR) {
/*
* Client error !!!
* Since this is bad request do not fail over.
*/
rec->s->state = JK_LB_STATE_OK;
p->states[rec->i] = JK_LB_STATE_ERROR;
rec->s->first_error_time = 0;
rec->s->last_error_time = 0;
rc = JK_CLIENT_ERROR;
recoverable = JK_FALSE;
}
else if (service_stat == JK_SERVER_ERROR) {
/*
* Internal JK server error.
* Keep previous global state.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
p->states[rec->i] = JK_LB_STATE_ERROR;
rc = JK_FALSE;
}
else if (service_stat == JK_AJP_PROTOCOL_ERROR) {
/*
* We've received a bad AJP message from the backend.
* Keep previous global state.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
p->states[rec->i] = JK_LB_STATE_ERROR;
rc = JK_FALSE;
}
else if (service_stat == JK_STATUS_ERROR) {
/*
* Status code configured as service is down.
* The node is fine.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
rec->s->state = JK_LB_STATE_OK;
p->states[rec->i] = JK_LB_STATE_ERROR;
rec->s->first_error_time = 0;
rec->s->last_error_time = 0;
rc = JK_FALSE;
}
else if (service_stat == JK_BUSY_ERROR) {
/*
* Node was busy.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
rec->s->state = JK_LB_STATE_BUSY;
p->states[rec->i] = JK_LB_STATE_BUSY;
rc = JK_FALSE;
}
else if (service_stat == JK_STATUS_FATAL_ERROR) {
/*
* Status code configured as service is down.
* Mark the node as bad.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
rec->s->errors++;
rec->s->state = JK_LB_STATE_ERROR;
p->states[rec->i] = JK_LB_STATE_ERROR;
rec->s->first_error_time = time(NULL);
rec->s->last_error_time = rec->s->first_error_time;
rc = JK_FALSE;
}
else if (service_stat == JK_REPLY_TIMEOUT) {
if (aw->s->reply_timeouts > (unsigned)p->worker->max_reply_timeouts) {
/*
* Service failed - to many reply timeouts
* Mark the node as bad.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
rec->s->errors++;
rec->s->state = JK_LB_STATE_ERROR;
p->states[rec->i] = JK_LB_STATE_ERROR;
rec->s->first_error_time = time(NULL);
rec->s->last_error_time = rec->s->first_error_time;
}
else {
/*
* Reply timeout, but not yet too many of them.
* Keep previous global state.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
p->states[rec->i] = JK_LB_STATE_ERROR;
}
rc = JK_FALSE;
}
else {
/*
* Various unspecific error cases.
* Keep previous global state, if we are not in local error since to long.
* Do not try to reuse the same node for the same request.
* Failing over to another node could help.
*/
time_t now = time(NULL);
rec->s->errors++;
if (aw->s->busy == 0 ||
p->worker->error_escalation_time == 0 ||
(rec->s->first_error_time > 0 &&
(int)difftime(now, rec->s->first_error_time) >= p->worker->error_escalation_time)) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s escalating local error to global error",
rec->name);
rec->s->state = JK_LB_STATE_ERROR;
}
p->states[rec->i] = JK_LB_STATE_ERROR;
if (rec->s->first_error_time == 0) {
rec->s->first_error_time = now;
}
rec->s->last_error_time = now;
rc = JK_FALSE;
}
if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
jk_shm_unlock();
if (p->states[rec->i] == JK_LB_STATE_ERROR)
jk_log(l, JK_LOG_INFO,
"service failed, worker %s is in %serror state",
rec->name,
rec->s->state == JK_LB_STATE_ERROR ? "" : "local ");
}
if (recoverable == JK_TRUE) {
/*
* Error is recoverable by submitting the request to
* another worker... Lets try to do that.
*/
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"recoverable error... will try to recover on other worker");
}
else {
/*
* Error is not recoverable - break with an error.
*/
if (rc == JK_CLIENT_ERROR)
jk_log(l, JK_LOG_INFO,
"unrecoverable error %d, request failed."
" Client failed in the middle of request,"
" we can't recover to another instance.",
*is_error);
else if (rc != JK_TRUE)
jk_log(l, JK_LOG_ERROR,
"unrecoverable error %d, request failed."
" Tomcat failed in the middle of request,"
" we can't recover to another instance.",
*is_error);
}
if (first == 1 && s->add_log_items) {
first = 0;
lb_add_log_items(s, lb_first_log_names, prec, l);
}
}
else {
/* No more workers left ... */
if (!was_forced) {
int nf;
/* Force recovery only once.
* If it still fails, Tomcat is still disconnected.
*/
jk_shm_lock();
nf = force_recovery(p->worker, p->states, l);
jk_shm_unlock();
was_forced = 1;
if (nf) {
/* We have forced recovery.
* Reset the service loop and go again
*/
prec = NULL;
jk_log(l, JK_LOG_INFO,
"Forcing recovery once for %d workers", nf);
continue;
}
else {
/* No workers in error state.
* Somebody set them all to disabled?
*/
jk_log(l, JK_LOG_INFO,
"All tomcat instances failed, no more workers "
"left for recovery (attempt=%d, retry=%d)",
attempt + 1, retry);
*is_error = JK_HTTP_SERVER_BUSY;
rc = JK_FALSE;
}
}
else {
jk_log(l, JK_LOG_INFO,
"All tomcat instances failed, no more workers "
"left (attempt=%d, retry=%d)",
attempt + 1, retry);
*is_error = JK_HTTP_SERVER_BUSY;
rc = JK_FALSE;
}
}
attempt++;
}
if (recoverable == JK_TRUE) {
jk_log(l, JK_LOG_INFO,
"All tomcat instances are busy or in error state");
/* rc and http error must be set above */
}
if (rc == JK_FALSE) {
jk_log(l, JK_LOG_ERROR,
"All tomcat instances failed, no more workers left");
}
if (prec && s->add_log_items) {
lb_add_log_items(s, lb_last_log_names, prec, l);
}
JK_TRACE_EXIT(l);
return rc;
}