static int JK_METHOD service()

in native/common/jk_lb_worker.c [1199:1696]


static int JK_METHOD service(jk_endpoint_t *e,
                             jk_ws_service_t *s,
                             jk_log_context_t *l, int *is_error)
{
    lb_endpoint_t *p;
    int attempt = 0;
    lb_sub_worker_t *prec = NULL;
    int num_of_workers;
    int first = 1;
    int was_forced = 0;
    int recoverable = JK_TRUE;
    int rc = JK_UNSET;
    char *sessionid = NULL;
    int i;
    int retry = 0;

    JK_TRACE_ENTER(l);

    if (!e || !e->endpoint_private || !s || !is_error) {
        JK_LOG_NULL_PARAMS(l);
        if (is_error)
            *is_error = JK_HTTP_SERVER_ERROR;
        JK_TRACE_EXIT(l);
        return JK_FALSE;
    }

    p = e->endpoint_private;
    num_of_workers = p->worker->num_of_workers;

    /* Set returned error to OK */
    *is_error = JK_HTTP_OK;

    if (p->worker->sequence < p->worker->s->h.sequence)
        jk_lb_pull(p->worker, JK_FALSE, l);
    for (i = 0; i < num_of_workers; i++) {
        lb_sub_worker_t *rec;
        ajp_worker_t *aw;
        jk_log(l, JK_LOG_DEBUG, "LB - num_of_workers: %d, retry: %d, lb_retries: %d", num_of_workers, i, p->worker->lb_retries);
        rec = &(p->worker->lb_workers[i]);
        aw = (ajp_worker_t *)rec->worker->worker_private;
        if (rec->s->state == JK_LB_STATE_BUSY) {
            if ((aw->busy_limit <= 0 || aw->s->busy < aw->busy_limit) &&
                ajp_has_endpoint(rec->worker, l)) {
                if (JK_IS_DEBUG_LEVEL(l))
                    jk_log(l, JK_LOG_DEBUG,
                           "worker %s busy state ended",
                           rec->name);
                rec->s->state = JK_LB_STATE_OK;
            }
        }
        /* Copy the shared state info */
        p->states[i] = rec->s->state;
    }

    /* set the recovery post, for LB mode */
    s->reco_buf = jk_b_new(s->pool);
    if (!s->reco_buf) {
        *is_error = JK_HTTP_SERVER_ERROR;
        jk_log(l, JK_LOG_ERROR,
               "Failed allocating AJP message");
        JK_TRACE_EXIT(l);
        return JK_SERVER_ERROR;
    }
    if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) {
        *is_error = JK_HTTP_SERVER_ERROR;
        jk_log(l, JK_LOG_ERROR,
               "Failed allocating AJP message buffer of %d bytes.", p->worker->max_packet_size);
        JK_TRACE_EXIT(l);
        return JK_SERVER_ERROR;
    }
    jk_b_reset(s->reco_buf);
    s->reco_status = RECO_INITED;

    if (p->worker->sticky_session && s->extension.sticky_ignore != JK_TRUE) {
        /* Use sessionid only if sticky_session is
         * defined and not overwritten for this load balancer
         */
        sessionid = get_sessionid(s, p->worker, l);
    }
    if (JK_IS_DEBUG_LEVEL(l))
        jk_log(l, JK_LOG_DEBUG,
               "service sticky_session=%d id='%s'",
               p->worker->sticky_session, sessionid ? sessionid : "empty");

    while (recoverable == JK_TRUE) {
        if (JK_IS_DEBUG_LEVEL(l))
            jk_log(l, JK_LOG_DEBUG, "attempt %d, max attempts %d, worker count %d",
                   attempt, p->worker->lb_retries, num_of_workers);
        if (attempt >= num_of_workers || attempt >= p->worker->lb_retries) {
            retry++;
            if (retry >= p->worker->retries) {
                /* Done with retrying */
                break;
            }
            if (JK_IS_DEBUG_LEVEL(l))
                jk_log(l, JK_LOG_DEBUG,
                       "retry %d, sleeping for %d ms before retrying",
                       retry, p->worker->retry_interval);
            jk_sleep(p->worker->retry_interval);
            /* Pull shared memory if something changed during sleep */
            if (p->worker->sequence < p->worker->s->h.sequence)
                jk_lb_pull(p->worker, JK_FALSE, l);
            for (i = 0; i < num_of_workers; i++) {
                /* Copy the shared state info */
                p->states[i] = p->worker->lb_workers[i].s->state;
            }
            attempt = 0;
        }
        rc = JK_FALSE;
        *is_error = JK_HTTP_SERVER_BUSY;
        i = get_most_suitable_worker(s, p->worker, sessionid, p->states, l);
        if (i >= 0) {
            int r;
            int is_service_error = JK_HTTP_OK;
            lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
            ajp_worker_t *aw = (ajp_worker_t *)rec->worker->worker_private;
            jk_endpoint_t *end = NULL;
            int activation = s->extension.activation ?
                             s->extension.activation[i] :
                             JK_LB_ACTIVATION_UNSET;
            if (activation == JK_LB_ACTIVATION_UNSET)
                activation = rec->activation;
            if (!s->route)
                s->route = rec->route;
            s->activation = jk_lb_get_activation_direct(activation, l);
            prec = rec;

            if (JK_IS_DEBUG_LEVEL(l))
                jk_log(l, JK_LOG_DEBUG,
                       "service worker=%s route=%s failover=%s",
                       rec->name, s->route, s->sticky ? "false" : "true");

            if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                jk_shm_lock();
            if (rec->s->state == JK_LB_STATE_RECOVER) {
                rec->s->state  = JK_LB_STATE_PROBE;
                p->states[rec->i] = JK_LB_STATE_PROBE;
            }
            if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                jk_shm_unlock();

            r = rec->worker->get_endpoint(rec->worker, &end, l);
            if (!r || !end) {
                /* If we can not get the endpoint
                 * mark the worker as busy rather then
                 * as in error if the retry number is
                 * greater then the number of retries.
                 */
                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_lock();
                if (rec->s->state != JK_LB_STATE_ERROR) {
                    rec->s->state  = JK_LB_STATE_BUSY;
                    p->states[rec->i] = JK_LB_STATE_BUSY;
                }
                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_unlock();
                jk_log(l, JK_LOG_INFO,
                       "could not get free endpoint for worker %s (%d retries)",
                       rec->name, retry);
            }
            else {
                int service_stat = JK_UNSET;
                jk_uint64_t rd = 0;
                jk_uint64_t wr = 0;
                int busy;
                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_lock();

                /* Increment the number of workers serving request */
                busy = JK_ATOMIC_INCREMENT(&(p->worker->s->busy));
                if (busy > p->worker->s->max_busy)
                    p->worker->s->max_busy = busy;
                if (p->worker->lbmethod == JK_LB_METHOD_REQUESTS ||
                    p->worker->lbmethod == JK_LB_METHOD_BUSYNESS ||
                    (!sessionid &&
                     s->extension.stateless != JK_TRUE &&
                     (p->worker->lbmethod == JK_LB_METHOD_SESSIONS ||
                      p->worker->lbmethod == JK_LB_METHOD_NEXT)))
                    rec->s->lb_value += rec->lb_mult;
                if (!sessionid && s->extension.stateless != JK_TRUE) {
                    rec->s->sessions++;
                }
                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_unlock();

                if (!s->sticky && (s->extension.set_session_cookie || p->worker->set_session_cookie)) {
                    char **old_names = s->resp_headers_names;
                    char **old_values = s->resp_headers_values;
                    s->resp_headers_names  = jk_pool_alloc(s->pool,
                                                      (s->num_resp_headers + 1) * sizeof(char *));
                    s->resp_headers_values = jk_pool_alloc(s->pool,
                                                      (s->num_resp_headers + 1) * sizeof(char *));
                    if (!s->resp_headers_names || !s->resp_headers_values) {
                        jk_log(l, JK_LOG_ERROR,
                               "Failed allocating %d new response headers.", s->num_resp_headers + 1);
                        s->resp_headers_names = old_names;
                        s->resp_headers_values = old_values;
                    } else if (s->num_resp_headers) {
                        memcpy(s->resp_headers_names, old_names, s->num_resp_headers * sizeof(char *));
                        memcpy(s->resp_headers_values, old_values, s->num_resp_headers * sizeof(char *));
                    }
                    s->resp_headers_names[s->num_resp_headers] = "Set-Cookie";
                    s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, p->worker->session_cookie,
                                                                                  "=.", rec->route,
                                                                                  NULL);
                    if (p->worker->session_cookie_path && *p->worker->session_cookie_path) {
                        s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, s->resp_headers_values[s->num_resp_headers],
                                                                                      ";PATH=", p->worker->session_cookie_path,
                                                                                      NULL);
                    }
                    s->resp_headers_values[s->num_resp_headers] = jk_pool_strcatv(s->pool, s->resp_headers_values[s->num_resp_headers],
                                                                                  ";HttpOnly",
                                                                                  (s->is_ssl ? ";Secure" : ""),
                                                                                  NULL);
                    if (JK_IS_DEBUG_LEVEL(l))
                        jk_log(l, JK_LOG_DEBUG, "Added cookie header '%s' with value '%s' ",
                               s->resp_headers_names[s->num_resp_headers],
                               s->resp_headers_values[s->num_resp_headers]);
                    s->num_resp_headers++;
                }
                service_stat = end->service(end, s, l, &is_service_error);
                rd = end->rd;
                wr = end->wr;
                recoverable = end->recoverable;
                *is_error = is_service_error;
                end->done(&end, l);

                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_lock();

                /* Update partial reads and writes if any */
                if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) {
                    rec->s->lb_value += (rd+wr)*rec->lb_mult;
                }
                else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) {
                    if (rec->s->lb_value >= rec->lb_mult) {
                        rec->s->lb_value -= rec->lb_mult;
                    }
                    else {
                        rec->s->lb_value = 0;
                        if (JK_IS_DEBUG_LEVEL(l)) {
                            jk_log(l, JK_LOG_DEBUG,
                                   "worker %s has load value to low (%"
                                   JK_UINT64_T_FMT
                                   " < %"
                                   JK_UINT64_T_FMT
                                   ") ",
                                   "- correcting to 0",
                                   rec->name,
                                   rec->s->lb_value,
                                   rec->lb_mult);
                        }
                    }
                }

                /* When have an endpoint and we ran a request, assume
                 * we are OK, unless we last were in error.
                 * We will below explicitely set OK or ERROR according
                 * to the returned service_stat.
                 */
                if (rec->s->state != JK_LB_STATE_ERROR) {
                    rec->s->state  = JK_LB_STATE_OK;
                    p->states[rec->i] = JK_LB_STATE_OK;
                }
                /* Decrement the busy worker count.
                 * Check if the busy was reset to zero by graceful
                 * restart of the server.
                 */
                JK_ATOMIC_DECREMENT(&(p->worker->s->busy));
                if (service_stat == JK_TRUE) {
                    /*
                     * Successful request.
                     */
                    rec->s->state  = JK_LB_STATE_OK;
                    p->states[rec->i] = JK_LB_STATE_OK;
                    rec->s->first_error_time = 0;
                    rec->s->last_error_time = 0;
                    rc = JK_TRUE;
                    recoverable = JK_UNSET;
                }
                else if (service_stat == JK_CLIENT_ERROR) {
                    /*
                     * Client error !!!
                     * Since this is bad request do not fail over.
                     */
                    rec->s->state  = JK_LB_STATE_OK;
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    rec->s->first_error_time = 0;
                    rec->s->last_error_time = 0;
                    rc = JK_CLIENT_ERROR;
                    recoverable = JK_FALSE;
                }
                else if (service_stat == JK_SERVER_ERROR) {
                    /*
                     * Internal JK server error.
                     * Keep previous global state.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    rc = JK_FALSE;
                }
                else if (service_stat == JK_AJP_PROTOCOL_ERROR) {
                    /*
                     * We've received a bad AJP message from the backend.
                     * Keep previous global state.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    rc = JK_FALSE;
                }
                else if (service_stat == JK_STATUS_ERROR) {
                    /*
                     * Status code configured as service is down.
                     * The node is fine.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    rec->s->state  = JK_LB_STATE_OK;
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    rec->s->first_error_time = 0;
                    rec->s->last_error_time = 0;
                    rc = JK_FALSE;
                }
                else if (service_stat == JK_BUSY_ERROR) {
                    /*
                     * Node was busy.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    rec->s->state  = JK_LB_STATE_BUSY;
                    p->states[rec->i] = JK_LB_STATE_BUSY;
                    rc = JK_FALSE;
                }
                else if (service_stat == JK_STATUS_FATAL_ERROR) {
                    /*
                     * Status code configured as service is down.
                     * Mark the node as bad.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    rec->s->errors++;
                    rec->s->state = JK_LB_STATE_ERROR;
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    rec->s->first_error_time = time(NULL);
                    rec->s->last_error_time = rec->s->first_error_time;
                    rc = JK_FALSE;
                }
                else if (service_stat == JK_REPLY_TIMEOUT) {
                    if (aw->s->reply_timeouts > (unsigned)p->worker->max_reply_timeouts) {
                        /*
                         * Service failed - to many reply timeouts
                         * Mark the node as bad.
                         * Do not try to reuse the same node for the same request.
                         * Failing over to another node could help.
                         */
                        rec->s->errors++;
                        rec->s->state = JK_LB_STATE_ERROR;
                        p->states[rec->i] = JK_LB_STATE_ERROR;
                        rec->s->first_error_time = time(NULL);
                        rec->s->last_error_time = rec->s->first_error_time;
                    }
                    else {
                        /*
                         * Reply timeout, but not yet too many of them.
                         * Keep previous global state.
                         * Do not try to reuse the same node for the same request.
                         * Failing over to another node could help.
                         */
                        p->states[rec->i] = JK_LB_STATE_ERROR;
                    }
                    rc = JK_FALSE;
                }
                else {
                    /*
                     * Various unspecific error cases.
                     * Keep previous global state, if we are not in local error since to long.
                     * Do not try to reuse the same node for the same request.
                     * Failing over to another node could help.
                     */
                    time_t now = time(NULL);
                    rec->s->errors++;
                    if (aw->s->busy == 0 ||
                        p->worker->error_escalation_time == 0 ||
                        (rec->s->first_error_time > 0 &&
                         (int)difftime(now, rec->s->first_error_time) >= p->worker->error_escalation_time)) {
                        if (JK_IS_DEBUG_LEVEL(l))
                            jk_log(l, JK_LOG_DEBUG,
                                   "worker %s escalating local error to global error",
                                   rec->name);
                        rec->s->state = JK_LB_STATE_ERROR;
                    }
                    p->states[rec->i] = JK_LB_STATE_ERROR;
                    if (rec->s->first_error_time == 0) {
                        rec->s->first_error_time = now;
                    }
                    rec->s->last_error_time = now;
                    rc = JK_FALSE;
                }
                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
                    jk_shm_unlock();
                if (p->states[rec->i] == JK_LB_STATE_ERROR)
                    jk_log(l, JK_LOG_INFO,
                           "service failed, worker %s is in %serror state",
                           rec->name,
                           rec->s->state == JK_LB_STATE_ERROR ? "" : "local ");
            }
            if (recoverable == JK_TRUE) {
                /*
                 * Error is recoverable by submitting the request to
                 * another worker... Lets try to do that.
                 */
                if (JK_IS_DEBUG_LEVEL(l))
                    jk_log(l, JK_LOG_DEBUG,
                           "recoverable error... will try to recover on other worker");
            }
            else {
                /*
                 * Error is not recoverable - break with an error.
                 */
                if (rc == JK_CLIENT_ERROR)
                    jk_log(l, JK_LOG_INFO,
                           "unrecoverable error %d, request failed."
                           " Client failed in the middle of request,"
                           " we can't recover to another instance.",
                           *is_error);
                else if (rc != JK_TRUE)
                    jk_log(l, JK_LOG_ERROR,
                           "unrecoverable error %d, request failed."
                           " Tomcat failed in the middle of request,"
                           " we can't recover to another instance.",
                           *is_error);
            }
            if (first == 1 && s->add_log_items) {
                first = 0;
                lb_add_log_items(s, lb_first_log_names, prec, l);
            }
        }
        else {
            /* No more workers left ... */
            if (!was_forced) {
                int nf;
                /* Force recovery only once.
                 * If it still fails, Tomcat is still disconnected.
                 */
                jk_shm_lock();
                nf = force_recovery(p->worker, p->states, l);
                jk_shm_unlock();
                was_forced = 1;
                if (nf) {
                    /* We have forced recovery.
                     * Reset the service loop and go again
                     */
                    prec = NULL;
                    jk_log(l, JK_LOG_INFO,
                           "Forcing recovery once for %d workers", nf);
                    continue;
                }
                else {
                    /* No workers in error state.
                     * Somebody set them all to disabled?
                     */
                    jk_log(l, JK_LOG_INFO,
                           "All tomcat instances failed, no more workers "
                           "left for recovery (attempt=%d, retry=%d)",
                           attempt + 1, retry);
                    *is_error = JK_HTTP_SERVER_BUSY;
                    rc = JK_FALSE;
                }
            }
            else {
                jk_log(l, JK_LOG_INFO,
                       "All tomcat instances failed, no more workers "
                       "left (attempt=%d, retry=%d)",
                       attempt + 1, retry);
                *is_error = JK_HTTP_SERVER_BUSY;
                rc = JK_FALSE;
            }
        }
        attempt++;
    }
    if (recoverable == JK_TRUE) {
        jk_log(l, JK_LOG_INFO,
               "All tomcat instances are busy or in error state");
        /* rc and http error must be set above */
    }
    if (rc == JK_FALSE) {
        jk_log(l, JK_LOG_ERROR,
               "All tomcat instances failed, no more workers left");
    }
    if (prec && s->add_log_items) {
        lb_add_log_items(s, lb_last_log_names, prec, l);
    }

    JK_TRACE_EXIT(l);
    return rc;
}