static void process_socket()

in server/mpm/event/event.c [1048:1339]


static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock,
                          event_conn_state_t * cs, int my_child_num,
                          int my_thread_num)
{
    conn_rec *c;
    long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
    int clogging = 0, from_wc_q = 0;
    apr_status_t rv;
    int rc = OK;

    if (cs == NULL) {           /* This is a new connection */
        listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
        cs = apr_pcalloc(p, sizeof(event_conn_state_t));
        cs->bucket_alloc = apr_bucket_alloc_create(p);
        ap_create_sb_handle(&cs->sbh, p, my_child_num, my_thread_num);
        c = ap_run_create_connection(p, ap_server_conf, sock,
                                     conn_id, cs->sbh, cs->bucket_alloc);
        if (!c) {
            ap_queue_info_push_pool(worker_queue_info, p);
            return;
        }
        apr_atomic_inc32(&connection_count);
        apr_pool_cleanup_register(c->pool, cs, decrement_connection_count,
                                  apr_pool_cleanup_null);
        ap_set_module_config(c->conn_config, &mpm_event_module, cs);
        c->current_thread = thd;
        c->cs = &cs->pub;
        cs->c = c;
        cs->p = p;
        cs->sc = ap_get_module_config(ap_server_conf->module_config,
                                      &mpm_event_module);
        cs->pfd.desc_type = APR_POLL_SOCKET;
        cs->pfd.desc.s = sock;
        pt->type = PT_CSD;
        pt->baton = cs;
        cs->pfd.client_data = pt;
        apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup);
        TO_QUEUE_ELEM_INIT(cs);

        ap_update_vhost_given_ip(c);

        rc = ap_pre_connection(c, sock);
        if (rc != OK && rc != DONE) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(00469)
                          "process_socket: connection aborted");
            close_connection(cs);
            return;
        }

        /**
         * XXX If the platform does not have a usable way of bundling
         * accept() with a socket readability check, like Win32,
         * and there are measurable delays before the
         * socket is readable due to the first data packet arriving,
         * it might be better to create the cs on the listener thread
         * with the state set to CONN_STATE_KEEPALIVE
         *
         * FreeBSD users will want to enable the HTTP accept filter
         * module in their kernel for the highest performance
         * When the accept filter is active, sockets are kept in the
         * kernel until a HTTP request is received.
         */
        cs->pub.state = CONN_STATE_PROCESSING;
        cs->pub.sense = CONN_SENSE_DEFAULT;
    }
    else {
        c = cs->c;
        ap_update_sb_handle(cs->sbh, my_child_num, my_thread_num);
        notify_resume(cs, 0);
        c->current_thread = thd;
        /* Subsequent request on a conn, and thread number is part of ID */
        c->id = conn_id;
    }

    if (CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)) {
        goto lingering_close;
    }

    if (cs->pub.state == CONN_STATE_PROCESSING
        /* If we have an input filter which 'clogs' the input stream,
         * like mod_ssl used to, lets just do the normal read from input
         * filters, like the Worker MPM does. Filters that need to write
         * where they would otherwise read, or read where they would
         * otherwise write, should set the sense appropriately.
         */
         || c->clogging_input_filters) {
 process_connection:
        cs->pub.state = CONN_STATE_PROCESSING;

        clogging = c->clogging_input_filters;
        if (clogging) {
            apr_atomic_inc32(&clogged_count);
        }
        rc = ap_run_process_connection(c);
        if (clogging) {
            apr_atomic_dec32(&clogged_count);
        }
        /*
         * The process_connection hooks should set the appropriate connection
         * state upon return, for event MPM to either:
         * - CONN_STATE_LINGER: do lingering close;
         * - CONN_STATE_WRITE_COMPLETION: flush pending outputs using Timeout
         *   and wait for next incoming data using KeepAliveTimeout, then come
         *   back to process_connection() hooks;
         * - CONN_STATE_SUSPENDED: suspend the connection such that it now
         *   interacts with the MPM through suspend/resume_connection() hooks,
         *   and/or registered poll callbacks (PT_USER), and/or registered
         *   timed callbacks triggered by timer events;
         * - CONN_STATE_ASYNC_WAITIO: wait for read/write-ability of the underlying
         *   socket using Timeout and come back to process_connection() hooks when
         *   ready;
         * - CONN_STATE_KEEPALIVE: now handled by CONN_STATE_WRITE_COMPLETION
         *   to flush before waiting for next data (that might depend on it).
         * If a process_connection hook returns an error or no hook sets the state
         * to one of the above expected value, forcibly close the connection w/
         * CONN_STATE_LINGER.  This covers the cases where no process_connection
         * hook executes (DECLINED), or one returns OK w/o touching the state (i.e.
         * CONN_STATE_PROCESSING remains after the call) which can happen with
         * third-party modules not updated to work specifically with event MPM
         * while this was expected to do lingering close unconditionally with
         * worker or prefork MPMs for instance.
         */
        switch (rc) {
        case DONE:
            rc = OK; /* same as OK, fall through */
        case OK:
            if (cs->pub.state == CONN_STATE_PROCESSING) {
                cs->pub.state = CONN_STATE_LINGER;
            }
            else if (cs->pub.state == CONN_STATE_KEEPALIVE) {
                cs->pub.state = CONN_STATE_WRITE_COMPLETION;
            }
            break;
        }
        if (rc != OK || (cs->pub.state != CONN_STATE_LINGER
                         && cs->pub.state != CONN_STATE_ASYNC_WAITIO
                         && cs->pub.state != CONN_STATE_WRITE_COMPLETION
                         && cs->pub.state != CONN_STATE_SUSPENDED)) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10111)
                          "process_socket: connection processing returned %i "
                          "(%sstate %i): closing",
                          rc, rc ? "" : "unexpected ", (int)cs->pub.state);
            cs->pub.state = CONN_STATE_LINGER;
        }
        else if (c->aborted) {
            cs->pub.state = CONN_STATE_LINGER;
        }
        if (cs->pub.state == CONN_STATE_LINGER) {
            goto lingering_close;
        }
    }
    else if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
        from_wc_q = 1;
    }

    if (cs->pub.state == CONN_STATE_ASYNC_WAITIO) {
        /* Set a read/write timeout for this connection, and let the
         * event thread poll for read/writeability.
         */
        cs->queue_timestamp = apr_time_now();
        notify_suspend(cs);

        ap_update_child_status(cs->sbh, SERVER_BUSY_READ, NULL);

        /* Modules might set c->cs->sense to CONN_SENSE_WANT_WRITE,
         * the default is CONN_SENSE_WANT_READ still.
         */
        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
        apr_thread_mutex_lock(timeout_mutex);
        TO_QUEUE_APPEND(cs->sc->io_q, cs);
        rv = apr_pollset_add(event_pollset, &cs->pfd);
        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
            AP_DEBUG_ASSERT(0);
            TO_QUEUE_REMOVE(cs->sc->io_q, cs);
            apr_thread_mutex_unlock(timeout_mutex);
            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(10503)
                         "process_socket: apr_pollset_add failure in "
                         "CONN_STATE_ASYNC_WAITIO");
            close_connection(cs);
            signal_threads(ST_GRACEFUL);
        }
        else {
            apr_thread_mutex_unlock(timeout_mutex);
        }
        return;
    }

    if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
        int pending = DECLINED;

        /* Flush all pending outputs before going to CONN_STATE_KEEPALIVE or
         * straight to CONN_STATE_PROCESSING if inputs are pending already.
         */
        
        ap_update_child_status(cs->sbh, SERVER_BUSY_WRITE, NULL);

        if (from_wc_q) {
            from_wc_q = 0; /* one shot */
            pending = ap_run_output_pending(c);
        }
        else if (ap_filter_should_yield(c->output_filters)) {
            pending = OK;
        }
        if (pending == OK) {
            /* Let the event thread poll for write */
            cs->queue_timestamp = apr_time_now();
            notify_suspend(cs);

            /* Add work to pollset. */
            cs->pub.sense = CONN_SENSE_DEFAULT;
            update_reqevents_from_sense(cs, CONN_SENSE_WANT_WRITE);
            apr_thread_mutex_lock(timeout_mutex);
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
            rv = apr_pollset_add(event_pollset, &cs->pfd);
            if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
                AP_DEBUG_ASSERT(0);
                TO_QUEUE_REMOVE(cs->sc->wc_q, cs);
                apr_thread_mutex_unlock(timeout_mutex);
                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                             "process_socket: apr_pollset_add failure in "
                             "CONN_STATE_WRITE_COMPLETION");
                close_connection(cs);
                signal_threads(ST_GRACEFUL);
            }
            else {
                apr_thread_mutex_unlock(timeout_mutex);
            }
            return;
        }
        if (pending != DECLINED || c->aborted || c->keepalive != AP_CONN_KEEPALIVE) {
            cs->pub.state = CONN_STATE_LINGER;
            goto lingering_close;
        }
        if (ap_run_input_pending(c) == OK) {
            goto process_connection;
        }
        if (listener_may_exit) {
            cs->pub.state = CONN_STATE_LINGER;
            goto lingering_close;
        }

        /* Fall through */
        cs->pub.state = CONN_STATE_KEEPALIVE;
    }

    if (cs->pub.state == CONN_STATE_KEEPALIVE) {
        ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL);

        /* It greatly simplifies the logic to use a single timeout value per q
         * because the new element can just be added to the end of the list and
         * it will stay sorted in expiration time sequence.  If brand new
         * sockets are sent to the event thread for a readability check, this
         * will be a slight behavior change - they use the non-keepalive
         * timeout today.  With a normal client, the socket will be readable in
         * a few milliseconds anyway.
         */
        cs->queue_timestamp = apr_time_now();
        notify_suspend(cs);

        /* Add work to pollset. */
        cs->pub.sense = CONN_SENSE_DEFAULT;
        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
        apr_thread_mutex_lock(timeout_mutex);
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
        rv = apr_pollset_add(event_pollset, &cs->pfd);
        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
            AP_DEBUG_ASSERT(0);
            TO_QUEUE_REMOVE(cs->sc->ka_q, cs);
            apr_thread_mutex_unlock(timeout_mutex);
            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                         "process_socket: apr_pollset_add failure for "
                         "keep alive");
            close_connection(cs);
            signal_threads(ST_GRACEFUL);
        }
        else {
            apr_thread_mutex_unlock(timeout_mutex);
        }
        return;
    }

    if (cs->pub.state == CONN_STATE_SUSPENDED) {
        cs->c->suspended_baton = cs;
        apr_atomic_inc32(&suspended_count);
        notify_suspend(cs);
        return;
    }

 lingering_close:
    /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
    process_lingering_close(cs);
}