apr_status_t h2_session_process()

in modules/http2/h2_session.c [1807:2074]


apr_status_t h2_session_process(h2_session *session, int async,
                                int *pkeepalive)
{
    apr_status_t status = APR_SUCCESS;
    conn_rec *c = session->c1;
    int rv, mpm_state, trace = APLOGctrace3(c);

    *pkeepalive = 0;
    if (trace) {
        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
                      H2_SSSN_MSG(session, "process start, async=%d"), async);
    }

    if (H2_SESSION_ST_INIT == session->state) {
        if (!h2_protocol_is_acceptable_c1(c, session->r, 1)) {
            const char *msg = nghttp2_strerror(NGHTTP2_INADEQUATE_SECURITY);
            update_child_status(session, SERVER_BUSY_READ, msg, NULL);
            h2_session_shutdown(session, APR_EINVAL, msg, 1);
        }
        else {
            update_child_status(session, SERVER_BUSY_READ, "init", NULL);
            status = h2_session_start(session, &rv);
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
                          H2_SSSN_LOG(APLOGNO(03079), session,
                          "started on %s:%d"),
                          session->s->server_hostname,
                          c->local_addr->port);
            if (status != APR_SUCCESS) {
                h2_session_dispatch_event(session,
                               H2_SESSION_EV_CONN_ERROR, status, NULL);
            }
            else {
                h2_session_dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
            }
        }
    }

    while (session->state != H2_SESSION_ST_DONE) {

        /* PR65731: we may get a new connection to process while the
         * MPM already is stopping. For example due to having reached
         * MaxRequestsPerChild limit.
         * Since this is supposed to handle things gracefully, we need to:
         * a) fully initialize the session before GOAWAYing
         * b) give the client the chance to submit at least one request
         */
        if (session->state != H2_SESSION_ST_INIT /* no longer intializing */
            && session->local.accepted_max > 0   /* have gotten at least one stream */
            && session->local.accepting          /* have not already locally shut down */
            && !ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
            if (mpm_state == AP_MPMQ_STOPPING) {
                h2_session_dispatch_event(session, H2_SESSION_EV_MPM_STOPPING, 0, NULL);
            }
        }

        session->status[0] = '\0';
        
        if (h2_session_want_send(session)) {
            h2_session_send(session);
        }
        else if (!nghttp2_session_want_read(session->ngh2)) {
            h2_session_dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
        }

        if (!h2_iq_empty(session->ready_to_process)) {
            h2_mplx_c1_process(session->mplx, session->ready_to_process,
                               get_stream, stream_pri_cmp, session,
                               &session->open_streams);
            transit(session, "scheduled stream", H2_SESSION_ST_BUSY);
        }

        if (session->input_flushed) {
            transit(session, "forwarded input", H2_SESSION_ST_BUSY);
            session->input_flushed = 0;
        }

        if (!h2_iq_empty(session->out_c1_blocked)) {
            unblock_c1_out(session);
            transit(session, "unblocked output", H2_SESSION_ST_BUSY);
        }

        if (session->reprioritize) {
            h2_mplx_c1_reprioritize(session->mplx, stream_pri_cmp, session);
            session->reprioritize = 0;
        }

        if (h2_session_want_send(session)) {
            h2_session_send(session);
        }

        status = h2_c1_io_assure_flushed(&session->io);
        if (APR_SUCCESS != status) {
            h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL);
        }

        switch (session->state) {
        case H2_SESSION_ST_INIT:
            ap_assert(0);
            h2_c1_read(session);
            break;

        case H2_SESSION_ST_IDLE:
            ap_assert(session->open_streams == 0);
            ap_assert(nghttp2_session_want_read(session->ngh2));
            if (!h2_session_want_send(session)) {
                /* Give any new incoming request a short grace period to
                 * arrive while we are still hot and return to the mpm
                 * connection handling when nothing really happened. */
                h2_c1_read(session);
                if (H2_SESSION_ST_IDLE == session->state) {
                    if (async) {
                        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
                                      H2_SSSN_LOG(APLOGNO(10306), session,
                                      "returning to mpm c1 monitoring"));
                        goto leaving;
                    }
                    else {
                        /* Not an async mpm, we must continue waiting
                         * for client data to arrive until the configured
                         * server Timeout/KeepAliveTimeout happens */
                        apr_time_t timeout = ((session->open_streams == 0) &&
                                              session->remote.emitted_count)?
                            session->s->keep_alive_timeout :
                            session->s->timeout;
                        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
                                      H2_SSSN_MSG(session, "polling timeout=%d"),
                                      (int)apr_time_sec(timeout));
                        status = h2_mplx_c1_poll(session->mplx, timeout,
                                                 on_stream_input,
                                                 on_stream_output, session);
                        if (APR_STATUS_IS_TIMEUP(status)) {
                            if (session->open_streams == 0) {
                                h2_session_dispatch_event(session,
                                    H2_SESSION_EV_CONN_TIMEOUT, status, NULL);
                                break;
                            }
                        }
                        else if (APR_SUCCESS != status) {
                            h2_session_dispatch_event(session,
                                H2_SESSION_EV_CONN_ERROR, status, NULL);
                            break;
                        }
                    }
                }
            }
            else {
                transit(session, "c1 io pending", H2_SESSION_ST_BUSY);
            }
            break;

        case H2_SESSION_ST_BUSY:
            /* IO happening in and out. Make sure we react to c2 events
             * inbetween send and receive. */
            status = h2_mplx_c1_poll(session->mplx, 0,
                                     on_stream_input, on_stream_output, session);
            if (APR_SUCCESS != status && !APR_STATUS_IS_TIMEUP(status)) {
                h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL);
                break;
            }
            h2_c1_read(session);
            break;

        case H2_SESSION_ST_WAIT:
            /* In this state, we might have returned processing to the MPM
             * before. On a connection socket event, we are invoked again and
             * need to process any input before proceeding. */
            h2_c1_read(session);
            if (session->state != H2_SESSION_ST_WAIT) {
                break;
            }

            status = h2_c1_io_assure_flushed(&session->io);
            if (APR_SUCCESS != status) {
                h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL);
                break;
            }
            if (session->open_streams == 0) {
                h2_session_dispatch_event(session, H2_SESSION_EV_NO_MORE_STREAMS,
                                          0, "streams really done");
                if (session->state != H2_SESSION_ST_WAIT) {
                    break;
                }
            }
            else if (async && h2_send_flow_blocked(session)) {
                /* By returning to the MPM, we do not block a worker
                 * and async wait for the client send window updates. */
                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
                              H2_SSSN_LOG(APLOGNO(10502), session,
                              "BLOCKED, return to mpm c1 monitoring"));
                goto leaving;
            }

            /* No IO happening and input is exhausted. Wait with
             * the c1 connection timeout for sth to happen in our c1/c2 sockets/pipes */
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
                          H2_SSSN_MSG(session, "polling timeout=%d, open_streams=%d"),
                          (int)apr_time_sec(session->s->timeout), session->open_streams);
            status = h2_mplx_c1_poll(session->mplx, session->s->timeout,
                                     on_stream_input, on_stream_output, session);
            if (APR_STATUS_IS_TIMEUP(status)) {
                /* If we timeout without streams open, no new request from client
                 * arrived.
                 * If we timeout without nghttp2 wanting to write something, but
                 * all open streams have something to send, it means we are
                 * blocked on HTTP/2 flow control and the client did not send
                 * WINDOW_UPDATEs to us. */
                if (session->open_streams == 0 ||
                    (!h2_session_want_send(session) &&
                     h2_mplx_c1_all_streams_want_send_data(session->mplx))) {
                    h2_session_dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, status, NULL);
                    break;
                }
            }
            else if (APR_SUCCESS != status) {
                h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL);
                break;
            }
            break;

        case H2_SESSION_ST_DONE:
            h2_c1_read(session);
            break;

        default:
            ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
                          H2_SSSN_LOG(APLOGNO(03080), session,
                          "unknown state"));
            h2_session_dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, APR_EGENERAL, NULL);
            break;
        }
    }

leaving:
    /* entering KeepAlive timing when we have no more open streams AND
     * we have processed at least one stream. */
    *pkeepalive = (session->open_streams == 0 && session->remote.emitted_count);
    if (trace) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
                      H2_SSSN_MSG(session, "process returns, keepalive=%d"),
                      *pkeepalive);
    }
    h2_mplx_c1_going_keepalive(session->mplx);

    if (session->state == H2_SESSION_ST_DONE) {
        if (session->local.error) {
            char buffer[128];
            const char *msg;
            if (session->local.error_msg) {
                msg = session->local.error_msg;
            }
            else {
                msg = apr_strerror(session->local.error, buffer, sizeof(buffer));
            }
            update_child_status(session, SERVER_CLOSING, msg, NULL);
        }
        else {
            update_child_status(session, SERVER_CLOSING, "done", NULL);
        }
    }
    else if (APR_STATUS_IS_EOF(status)
            || APR_STATUS_IS_ECONNRESET(status) 
            || APR_STATUS_IS_ECONNABORTED(status)) {
        h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL);
        update_child_status(session, SERVER_CLOSING, "error", NULL);
    }

    return (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
}