void ProcessHttpRequest()

in src/brpc/policy/http_rpc_protocol.cpp [1403:1720]


void ProcessHttpRequest(InputMessageBase *msg) {
    const int64_t start_parse_us = butil::cpuwide_time_us();
    DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
    SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg->arg());
    ScopedNonServiceError non_service_error(server);

    Controller* cntl = new (std::nothrow) Controller;
    if (NULL == cntl) {
        LOG(FATAL) << "Fail to new Controller";
        return;
    }
    HttpResponseSender resp_sender(cntl);
    resp_sender.set_received_us(msg->received_us());

    const bool is_http2 = imsg_guard->header().is_http2();
    if (is_http2) {
        H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
        resp_sender.set_h2_stream_id(h2_sctx->stream_id());
    }

    ControllerPrivateAccessor accessor(cntl);
    HttpHeader& req_header = cntl->http_request();
    imsg_guard->header().Swap(req_header);
    butil::IOBuf& req_body = imsg_guard->body();
    butil::EndPoint user_addr;
    if (!GetUserAddressFromHeader(req_header, &user_addr)) {
        user_addr = socket->remote_side();
    }
    ServerPrivateAccessor server_accessor(server);
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(user_addr)
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);
    
    // Read log-id. errno may be set when input to strtoull overflows.
    // atoi/atol/atoll don't support 64-bit integer and can't be used.
    const std::string* log_id_str = req_header.GetHeader(common->LOG_ID);
    if (log_id_str) {
        char* logid_end = NULL;
        errno = 0;
        uint64_t logid = strtoull(log_id_str->c_str(), &logid_end, 10);
        if (*logid_end || errno) {
            LOG(ERROR) << "Invalid " << common->LOG_ID << '=' 
                       << *log_id_str << " in http request";
        } else {
            cntl->set_log_id(logid);
        }
    }

    const std::string* request_id = req_header.GetHeader(FLAGS_request_id_header);
    if (request_id) {
        cntl->set_request_id(*request_id);
    }

    // Tag the bthread with this server's key for
    // thread_local_data().
    if (server->thread_local_options().thread_local_data_factory) {
        bthread_assign_data((void*)&server->thread_local_options());
    }

    Span* span = NULL;
    const std::string& path = req_header.uri().path();
    const std::string* trace_id_str = req_header.GetHeader("x-bd-trace-id");
    if (IsTraceable(trace_id_str)) {
        uint64_t trace_id = 0;
        if (trace_id_str) {
            trace_id = strtoull(trace_id_str->c_str(), NULL, 10);
        }
        uint64_t span_id = 0;
        const std::string* span_id_str = req_header.GetHeader("x-bd-span-id");
        if (span_id_str) {
            span_id = strtoull(span_id_str->c_str(), NULL, 10);
        }
        uint64_t parent_span_id = 0;
        const std::string* parent_span_id_str =
            req_header.GetHeader("x-bd-parent-span-id");
        if (parent_span_id_str) {
            parent_span_id = strtoull(parent_span_id_str->c_str(), NULL, 10);
        }
        span = Span::CreateServerSpan(
            path, trace_id, span_id, parent_span_id, msg->base_real_us());
        accessor.set_span(span);
        span->set_log_id(cntl->log_id());
        span->set_remote_side(user_addr);
        span->set_received_us(msg->received_us());
        span->set_start_parse_us(start_parse_us);
        span->set_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP);
        span->set_request_size(imsg_guard->parsed_length());
    }
    
    if (!server->IsRunning()) {
        cntl->SetFailed(ELOGOFF, "Server is stopping");
        return;
    }

    if (server->options().http_master_service) {
        // If http_master_service is on, just call it.
        google::protobuf::Service* svc = server->options().http_master_service;
        const google::protobuf::MethodDescriptor* md =
            svc->GetDescriptor()->FindMethodByName(common->DEFAULT_METHOD);
        if (md == NULL) {
            cntl->SetFailed(ENOMETHOD, "No default_method in http_master_service");
            return;
        }
        accessor.set_method(md);
        cntl->request_attachment().swap(req_body);
        google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
        if (span) {
            span->ResetServerSpanName(md->full_name());
            span->set_start_callback_us(butil::cpuwide_time_us());
            span->AsParent();
        }
        // `cntl', `req' and `res' will be deleted inside `done'
        return svc->CallMethod(md, cntl, NULL, NULL, done);
    }
    
    const Server::MethodProperty* const mp =
        FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
    if (NULL == mp) {
        if (security_mode) {
            std::string escape_path;
            WebEscape(path, &escape_path);
            cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());
        } else {
            cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
        }
        return;
    } else if (mp->service->GetDescriptor() == BadMethodService::descriptor()) {
        BadMethodRequest breq;
        BadMethodResponse bres;
        butil::StringSplitter split(path.c_str(), '/');
        breq.set_service_name(std::string(split.field(), split.length()));
        mp->service->CallMethod(mp->method, cntl, &breq, &bres, NULL);
        return;
    }
    // Switch to service-specific error.
    non_service_error.release();
    MethodStatus* method_status = mp->status;
    resp_sender.set_method_status(method_status);
    if (method_status) {
        int rejected_cc = 0;
        if (!method_status->OnRequested(&rejected_cc)) {
            cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                            mp->method->full_name().c_str(), rejected_cc);
            return;
        }
    }
    
    if (span) {
        span->ResetServerSpanName(mp->method->full_name());
    }
    // NOTE: accesses to builtin services are not counted as part of
    // concurrency, therefore are not limited by ServerOptions.max_concurrency.
    if (!mp->is_builtin_service && !mp->params.is_tabbed) {
        if (socket->is_overcrowded() &&
            !server->options().ignore_eovercrowded &&
            !mp->ignore_eovercrowded) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            return;
        }
        if (!server_accessor.AddConcurrency(cntl)) {
            cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
                            server->options().max_concurrency);
            return;
        }
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            return;
        }
        if (!server->AcceptRequest(cntl)) {
            return;
        }
    } else if (security_mode) {
        cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
                        "ServerOptions.internal_port=%d instead if you're in"
                        " internal network", server->options().internal_port);
        return;
    }

    google::protobuf::Service* svc = mp->service;
    const google::protobuf::MethodDescriptor* method = mp->method;
    accessor.set_method(method);
    RpcPBMessages* messages = server->options().rpc_pb_message_factory->Get(*svc, *method);;
    resp_sender.set_messages(messages);
    google::protobuf::Message* req = messages->Request();
    google::protobuf::Message* res = messages->Response();

    if (__builtin_expect(!req || !res, 0)) {
        PLOG(FATAL) << "Fail to new req or res";
        cntl->SetFailed("Fail to new req or res");
        return;
    }
    if (mp->params.allow_http_body_to_pb &&
        method->input_type()->field_count() > 0) {
        // A protobuf service. No matter if Content-type is set to
        // applcation/json or body is empty, we have to treat body as a json
        // and try to convert it to pb, which guarantees that a protobuf
        // service is always accessed with valid requests.
        if (req_body.empty()) {
            // Treat empty body specially since parsing it results in error
            if (!req->IsInitialized()) {
                cntl->SetFailed(EREQUEST, "%s needs to be created from a"
                                " non-empty json, it has required fields.",
                                req->GetDescriptor()->full_name().c_str());
                return;
            } // else all fields of the request are optional.
        } else {
            bool is_grpc_ct = false;
            const HttpContentType content_type =
                ParseContentType(req_header.content_type(), &is_grpc_ct);
            const std::string* encoding = NULL;
            if (is_http2 && is_grpc_ct) {
                bool grpc_compressed = false;
                if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {
                    cntl->SetFailed(EREQUEST, "Invalid gRPC request");
                    return;
                }
                if (grpc_compressed) {
                    encoding = req_header.GetHeader(common->GRPC_ENCODING);
                    if (encoding == NULL) {
                        cntl->SetFailed(
                            EREQUEST, "Fail to find header `grpc-encoding'"
                            " in compressed gRPC request");
                        return;
                    }
                }
                int64_t timeout_value_us =
                    ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
                if (timeout_value_us >= 0) {
                    accessor.set_deadline_us(
                            butil::gettimeofday_us() + timeout_value_us);
                }
            } else { // http or h2 but not grpc
                encoding = req_header.GetHeader(common->CONTENT_ENCODING);
            }
            if (encoding != NULL && *encoding == common->GZIP) {
                TRACEPRINTF("Decompressing request=%lu",
                            (unsigned long)req_body.size());
                butil::IOBuf uncompressed;
                if (!policy::GzipDecompress(req_body, &uncompressed)) {
                    cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
                    return;
                }
                req_body.swap(uncompressed);
            }
            if (content_type == HTTP_CONTENT_PROTO) {
                if (!ParsePbFromIOBuf(req, req_body)) {
                    cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
                                    req->GetDescriptor()->full_name().c_str());
                    return;
                }
            } else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
                if (!ParsePbTextFromIOBuf(req, req_body)) {
                    cntl->SetFailed(EREQUEST, "Fail to parse http proto-text body as %s",
                                    req->GetDescriptor()->full_name().c_str());
                    return;
                }
            } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
                if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
                    return;
                }
            } else {
                cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64);
                cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array);
                if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
                    return;
                }
            }
        }
        if (!is_http2) {
            SampledRequest* sample = AskToBeSampled();
            if (sample) {
                sample->meta.set_compress_type(COMPRESS_TYPE_NONE);
                sample->meta.set_protocol_type(PROTOCOL_HTTP);
                sample->meta.set_attachment_size(req_body.size());

                butil::EndPoint ep;
                MakeRawHttpRequest(&sample->request, &req_header, ep, &req_body);
                sample->submit(start_parse_us);
            }
        }
    } else {
        if (imsg_guard->read_body_progressively()) {
            accessor.set_readable_progressive_attachment(imsg_guard.get());
        } else {
            // A http server, just keep content as it is.
            cntl->request_attachment().swap(req_body);
        }
    }

    google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
    imsg_guard.reset();  // optional, just release resource ASAP

    if (span) {
        span->set_start_callback_us(butil::cpuwide_time_us());
        span->AsParent();
    }
    if (!FLAGS_usercode_in_pthread) {
        return svc->CallMethod(method, cntl, req, res, done);
    }
    if (BeginRunningUserCode()) {
        svc->CallMethod(method, cntl, req, res, done);
        return EndRunningUserCodeInPlace();
    } else {
        return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
    }
}