void ProcessSofaRequest()

in src/brpc/policy/sofa_pbrpc_protocol.cpp [318:487]


void ProcessSofaRequest(InputMessageBase* msg_base) {
    const int64_t start_parse_us = butil::cpuwide_time_us();
    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    SocketUniquePtr socket_guard(msg->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg_base->arg());
    ScopedNonServiceError non_service_error(server);

    SofaRpcMeta meta;
    if (!ParsePbFromIOBuf(&meta, msg->meta)) {
        LOG(WARNING) << "Fail to parse SofaRpcMeta from " << *socket;
        socket->SetFailed(EREQUEST, "Fail to parse SofaRpcMeta from %s",
                          socket->description().c_str());
        return;
    }
    const CompressType req_cmp_type = Sofa2CompressType(meta.compress_type());

    SampledRequest* sample = AskToBeSampled();
    if (sample) {
        sample->meta.set_method_name(meta.method());
        sample->meta.set_compress_type(req_cmp_type);
        sample->meta.set_protocol_type(PROTOCOL_SOFA_PBRPC);
        sample->request = msg->payload;
        sample->submit(start_parse_us);
    }

    std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
    if (NULL == cntl.get()) {
        LOG(WARNING) << "Fail to new Controller";
        return;
    }
    std::unique_ptr<google::protobuf::Message> req;
    std::unique_ptr<google::protobuf::Message> res;

    ControllerPrivateAccessor accessor(cntl.get());
    ServerPrivateAccessor server_accessor(server);
    const int64_t correlation_id = meta.sequence_id();
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();

    cntl->set_request_compress_type(req_cmp_type);
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(socket->remote_side())
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(PROTOCOL_SOFA_PBRPC)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);

    // Tag the bthread with this server's key for thread_local_data().
    if (server->thread_local_options().thread_local_data_factory) {
        bthread_assign_data((void*)&server->thread_local_options());
    }

    Span* span = NULL;
    if (IsTraceable(false)) {
        span = Span::CreateServerSpan(
            0/*meta.trace_id()*/, 0/*meta.span_id()*/,
            0/*meta.parent_span_id()*/, msg->base_real_us());
        accessor.set_span(span);
        span->set_remote_side(cntl->remote_side());
        span->set_protocol(PROTOCOL_SOFA_PBRPC);
        span->set_received_us(msg->received_us());
        span->set_start_parse_us(start_parse_us);
        span->set_request_size(msg->meta.size() + msg->payload.size() + 24);
    }

    MethodStatus* method_status = NULL;
    do {
        if (!server->IsRunning()) {
            cntl->SetFailed(ELOGOFF, "Server is stopping");
            break;
        }

        if (!server_accessor.AddConcurrency(cntl.get())) {
            cntl->SetFailed(
                ELIMIT, "Reached server's max_concurrency=%d",
                server->options().max_concurrency);
            break;
        }
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            break;
        }
        
        const Server::MethodProperty *sp =
            server_accessor.FindMethodPropertyByFullName(meta.method());
        if (NULL == sp) {
            cntl->SetFailed(ENOMETHOD, "Fail to find method=%s", 
                            meta.method().c_str());
            break;
        }
        if (socket->is_overcrowded() &&
            !server->options().ignore_eovercrowded &&
            !sp->ignore_eovercrowded) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            break;
        }
        // Switch to service-specific error.
        non_service_error.release();
        method_status = sp->status;
        if (method_status) {
            int rejected_cc = 0;
            if (!method_status->OnRequested(&rejected_cc)) {
                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                sp->method->full_name().c_str(), rejected_cc);
                break;
            }
        }
        google::protobuf::Service* svc = sp->service;
        const google::protobuf::MethodDescriptor* method = sp->method;
        accessor.set_method(method);

        if (!server->AcceptRequest(cntl.get())) {
            break;
        }

        if (span) {
            span->ResetServerSpanName(method->full_name());
        }
        req.reset(svc->GetRequestPrototype(method).New());
        if (!ParseFromCompressedData(msg->payload, req.get(), req_cmp_type)) {
            cntl->SetFailed(EREQUEST, "Fail to parse request message, "
                            "CompressType=%d, size=%d", 
                            req_cmp_type, (int)msg->payload.size());
            break;
        }

        res.reset(svc->GetResponsePrototype(method).New());
        // `socket' will be held until response has been sent
        google::protobuf::Closure* done = ::brpc::NewCallback<
            int64_t, Controller*, const google::protobuf::Message*,
            const google::protobuf::Message*, const Server*,
                  MethodStatus *, int64_t>(
                    &SendSofaResponse, correlation_id, cntl.get(),
                    req.get(), res.get(), server,
                    method_status, msg->received_us());

        msg.reset();  // optional, just release resource ASAP

        // `cntl', `req' and `res' will be deleted inside `done'
        if (span) {
            span->set_start_callback_us(butil::cpuwide_time_us());
            span->AsParent();
        }
        if (!FLAGS_usercode_in_pthread) {
            return svc->CallMethod(method, cntl.release(), 
                                   req.release(), res.release(), done);
        }
        if (BeginRunningUserCode()) {
            svc->CallMethod(method, cntl.release(), 
                            req.release(), res.release(), done);
            return EndRunningUserCodeInPlace();
        } else {
            return EndRunningCallMethodInPool(
                svc, method, cntl.release(),
                req.release(), res.release(), done);
        }
    } while (false);

    // `cntl', `req' and `res' will be deleted inside `SendSofaResponse'
    // `socket' will be held until response has been sent
    SendSofaResponse(correlation_id, cntl.release(),
                     req.release(), res.release(), server,
                     method_status, msg->received_us());
}