void ProcessRpcRequest()

in src/brpc/policy/baidu_rpc_protocol.cpp [552:839]


void ProcessRpcRequest(InputMessageBase* msg_base) {
    const int64_t start_parse_us = butil::cpuwide_time_us();
    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    SocketUniquePtr socket_guard(msg->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg_base->arg());
    ScopedNonServiceError non_service_error(server);

    RpcMeta meta;
    if (!ParsePbFromIOBuf(&meta, msg->meta)) {
        LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;
        socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",
                          socket->description().c_str());
        return;
    }
    const RpcRequestMeta &request_meta = meta.request();

    SampledRequest* sample = AskToBeSampled();
    if (sample) {
        sample->meta.set_service_name(request_meta.service_name());
        sample->meta.set_method_name(request_meta.method_name());
        sample->meta.set_compress_type((CompressType)meta.compress_type());
        sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);
        sample->meta.set_attachment_size(meta.attachment_size());
        sample->meta.set_authentication_data(meta.authentication_data());
        sample->request = msg->payload;
        sample->submit(start_parse_us);
    }

    std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
    if (NULL == cntl.get()) {
        LOG(WARNING) << "Fail to new Controller";
        return;
    }

    RpcPBMessages* messages = NULL;

    ServerPrivateAccessor server_accessor(server);
    ControllerPrivateAccessor accessor(cntl.get());
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    if (request_meta.has_log_id()) {
        cntl->set_log_id(request_meta.log_id());
    }
    if (request_meta.has_request_id()) {
        cntl->set_request_id(request_meta.request_id());
    }
    if (request_meta.has_timeout_ms()) {
        cntl->set_timeout_ms(request_meta.timeout_ms());
    }
    cntl->set_request_content_type(meta.content_type());
    cntl->set_request_compress_type((CompressType)meta.compress_type());
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(socket->remote_side())
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(PROTOCOL_BAIDU_STD)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);

    if (meta.has_stream_settings()) {
        accessor.set_remote_stream_settings(meta.release_stream_settings());
    }

    if (!meta.user_fields().empty()) {
        for (const auto& it : meta.user_fields()) {
            (*cntl->request_user_fields())[it.first] = it.second;
        }
    }

    // Tag the bthread with this server's key for thread_local_data().
    if (server->thread_local_options().thread_local_data_factory) {
        bthread_assign_data((void*)&server->thread_local_options());
    }

    Span* span = NULL;
    if (IsTraceable(request_meta.has_trace_id())) {
        span = Span::CreateServerSpan(
            request_meta.trace_id(), request_meta.span_id(),
            request_meta.parent_span_id(), msg->base_real_us());
        accessor.set_span(span);
        span->set_log_id(request_meta.log_id());
        span->set_remote_side(cntl->remote_side());
        span->set_protocol(PROTOCOL_BAIDU_STD);
        span->set_received_us(msg->received_us());
        span->set_start_parse_us(start_parse_us);
        span->set_request_size(msg->payload.size() + msg->meta.size() + 12);
    }

    MethodStatus* method_status = NULL;
    do {
        if (!server->IsRunning()) {
            cntl->SetFailed(ELOGOFF, "Server is stopping");
            break;
        }

        if (!server_accessor.AddConcurrency(cntl.get())) {
            cntl->SetFailed(
                ELIMIT, "Reached server's max_concurrency=%d",
                server->options().max_concurrency);
            break;
        }

        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            break;
        }

        const int req_size = static_cast<int>(msg->payload.size());
        if (meta.has_attachment_size()) {
            if (req_size < meta.attachment_size()) {
                cntl->SetFailed(EREQUEST,
                    "attachment_size=%d is larger than request_size=%d",
                    meta.attachment_size(), req_size);
                break;
            }
        }

        google::protobuf::Service* svc = NULL;
        google::protobuf::MethodDescriptor* method = NULL;
        if (NULL != server->options().baidu_master_service) {
          if (socket->is_overcrowded() &&
              !server->options().ignore_eovercrowded &&
              !server->options().baidu_master_service->ignore_eovercrowded()) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            break;
          }
            svc = server->options().baidu_master_service;
            auto sampled_request = new (std::nothrow) SampledRequest;
            if (NULL == sampled_request) {
                cntl->SetFailed(ENOMEM, "Fail to get sampled_request");
                break;
            }
            sampled_request->meta.set_service_name(request_meta.service_name());
            sampled_request->meta.set_method_name(request_meta.method_name());
            cntl->reset_sampled_request(sampled_request);
            // Switch to service-specific error.
            non_service_error.release();
            method_status = server->options().baidu_master_service->_status;
            if (method_status) {
                int rejected_cc = 0;
                if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
                    cntl->SetFailed(
                        ELIMIT,
                        "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                        butil::class_name<BaiduMasterService>(), rejected_cc);
                    break;
                }
            }
            if (span) {
                span->ResetServerSpanName(sampled_request->meta.method_name());
            }

            messages = BaiduProxyPBMessages::Get();
            msg->payload.cutn(
                &((SerializedRequest*)messages->Request())->serialized_data(),
                req_size - meta.attachment_size());
            if (!msg->payload.empty()) {
                cntl->request_attachment().swap(msg->payload);
            }
        } else {
            // NOTE(gejun): jprotobuf sends service names without packages. So the
            // name should be changed to full when it's not.
            butil::StringPiece svc_name(request_meta.service_name());
            if (svc_name.find('.') == butil::StringPiece::npos) {
                const Server::ServiceProperty* sp =
                    server_accessor.FindServicePropertyByName(svc_name);
                if (NULL == sp) {
                    cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
                        request_meta.service_name().c_str());
                    break;
                }
                svc_name = sp->service->GetDescriptor()->full_name();
            }
            const Server::MethodProperty* mp =
                server_accessor.FindMethodPropertyByFullName(
                    svc_name, request_meta.method_name());
            if (NULL == mp) {
                cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
                                request_meta.service_name().c_str(),
                                request_meta.method_name().c_str());
                break;
            } else if (mp->service->GetDescriptor() == BadMethodService::descriptor()) {
                BadMethodRequest breq;
                BadMethodResponse bres;
                breq.set_service_name(request_meta.service_name());
                mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
                break;
            }
            if (socket->is_overcrowded() &&
                !server->options().ignore_eovercrowded &&
                !mp->ignore_eovercrowded) {
              cntl->SetFailed(
                  EOVERCROWDED, "Connection to %s is overcrowded",
                  butil::endpoint2str(socket->remote_side()).c_str());
              break;
            }
            // Switch to service-specific error.
            non_service_error.release();
            method_status = mp->status;
            if (method_status) {
                int rejected_cc = 0;
                if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
                    cntl->SetFailed(
                        ELIMIT,
                        "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                        mp->method->full_name().c_str(), rejected_cc);
                    break;
                }
            }
            svc = mp->service;
            method = const_cast<google::protobuf::MethodDescriptor*>(mp->method);
            accessor.set_method(method);

            if (span) {
                span->ResetServerSpanName(method->full_name());
            }

            if (!server->AcceptRequest(cntl.get())) {
                break;
            }

            butil::IOBuf req_buf;
            int body_without_attachment_size = req_size - meta.attachment_size();
            msg->payload.cutn(&req_buf, body_without_attachment_size);
            if (meta.attachment_size() > 0) {
                cntl->request_attachment().swap(msg->payload);
            }

            ContentType content_type = meta.content_type();
            auto compress_type = static_cast<CompressType>(meta.compress_type());
            messages = server->options().rpc_pb_message_factory->Get(*svc, *method);
            if (!DeserializeRpcMessage(req_buf, *cntl, content_type,
                                       compress_type, messages->Request())) {
                cntl->SetFailed(
                    EREQUEST, "Fail to parse request=%s, ContentType=%s, "
                              "CompressType=%s, request_size=%d",
                    messages->Request()->GetDescriptor()->full_name().c_str(),
                    ContentTypeToCStr(content_type),
                    CompressTypeToCStr(compress_type), req_size);
                break;
            }
            req_buf.clear();
        }

        // `socket' will be held until response has been sent
        google::protobuf::Closure* done = ::brpc::NewCallback<
            int64_t, Controller*, RpcPBMessages*,
            const Server*, MethodStatus*, int64_t>(
                &SendRpcResponse, meta.correlation_id(),cntl.get(),
                messages, server, method_status, msg->received_us());

        // optional, just release resource ASAP
        msg.reset();

        if (span) {
            span->set_start_callback_us(butil::cpuwide_time_us());
            span->AsParent();
        }
        if (!FLAGS_usercode_in_pthread) {
            return svc->CallMethod(method, cntl.release(), 
                                   messages->Request(),
                                   messages->Response(), done);
        }
        if (BeginRunningUserCode()) {
            svc->CallMethod(method, cntl.release(), 
                            messages->Request(),
                            messages->Response(), done);
            return EndRunningUserCodeInPlace();
        } else {
            return EndRunningCallMethodInPool(
                svc, method, cntl.release(),
                messages->Request(),
                messages->Response(), done);
        }
    } while (false);
    
    // `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
    // `socket' will be held until response has been sent
    SendRpcResponse(meta.correlation_id(),
                    cntl.release(), messages,
                    server, method_status,
                    msg->received_us());
}