void ProcessHuluRequest()

in src/brpc/policy/hulu_pbrpc_protocol.cpp [341:547]


void ProcessHuluRequest(InputMessageBase* msg_base) {
    const int64_t start_parse_us = butil::cpuwide_time_us();
    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    SocketUniquePtr socket_guard(msg->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg_base->arg());
    ScopedNonServiceError non_service_error(server);

    HuluRpcRequestMeta meta;
    if (!ParsePbFromIOBuf(&meta, msg->meta)) {
        LOG(WARNING) << "Fail to parse HuluRpcRequestMeta, close the connection";
        socket->SetFailed();
        return;
    }

    const CompressType req_cmp_type = Hulu2CompressType((HuluCompressType)meta.compress_type());
    SampledRequest* sample = AskToBeSampled();
    if (sample) {
        sample->meta.set_service_name(meta.service_name());
        sample->meta.set_method_index(meta.method_index());
        sample->meta.set_compress_type(req_cmp_type);
        sample->meta.set_protocol_type(PROTOCOL_HULU_PBRPC);
        sample->meta.set_user_data(meta.user_data());
        if (meta.has_user_message_size()
            && static_cast<size_t>(meta.user_message_size()) < msg->payload.size()) {
            size_t attachment_size = msg->payload.size() - meta.user_message_size();
            sample->meta.set_attachment_size(attachment_size);
        }
        sample->request = msg->payload;
        sample->submit(start_parse_us);
    }

    std::unique_ptr<HuluController> cntl(new (std::nothrow) HuluController());
    if (NULL == cntl.get()) {
        LOG(WARNING) << "Fail to new Controller";
        return;
    }
    std::unique_ptr<google::protobuf::Message> req;
    std::unique_ptr<google::protobuf::Message> res;

    ServerPrivateAccessor server_accessor(server);
    ControllerPrivateAccessor accessor(cntl.get());
    int64_t correlation_id = meta.correlation_id();
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    if (meta.has_log_id()) {
        cntl->set_log_id(meta.log_id());
    }
    cntl->set_request_compress_type(req_cmp_type);
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(socket->remote_side())
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(PROTOCOL_HULU_PBRPC)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);

    if (meta.has_user_data()) {
        cntl->set_request_user_data(meta.user_data());
    }

    if (meta.has_user_defined_source_addr()) {
        cntl->set_request_source_addr(meta.user_defined_source_addr());
    }
    

    // Tag the bthread with this server's key for thread_local_data().
    if (server->thread_local_options().thread_local_data_factory) {
        bthread_assign_data((void*)&server->thread_local_options());
    }

    Span* span = NULL;
    if (IsTraceable(meta.has_trace_id())) {
        span = Span::CreateServerSpan(
            meta.trace_id(), meta.span_id(), meta.parent_span_id(),
            msg->base_real_us());
        accessor.set_span(span);
        span->set_log_id(meta.log_id());
        span->set_remote_side(cntl->remote_side());
        span->set_protocol(PROTOCOL_HULU_PBRPC);
        span->set_received_us(msg->received_us());
        span->set_start_parse_us(start_parse_us);
        span->set_request_size(msg->payload.size() + msg->meta.size() + 12);
    }

    MethodStatus* method_status = NULL;
    do {
        if (!server->IsRunning()) {
            cntl->SetFailed(ELOGOFF, "Server is stopping");
            break;
        }

        if (!server_accessor.AddConcurrency(cntl.get())) {
            cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
                            server->options().max_concurrency);
            break;
        }
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            break;
        }
        
        const Server::MethodProperty *sp =
            server_accessor.FindMethodPropertyByNameAndIndex(
                meta.service_name(), meta.method_index());
        if (NULL == sp) {
            cntl->SetFailed(ENOMETHOD, "Fail to find method=%d of service=%s",
                            meta.method_index(), meta.service_name().c_str());
            break;
        } else if (sp->service->GetDescriptor()
                   == BadMethodService::descriptor()) {
            BadMethodRequest breq;
            BadMethodResponse bres;
            breq.set_service_name(meta.service_name());
            sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL);
            break;
        }
        if (socket->is_overcrowded() &&
            !server->options().ignore_eovercrowded &&
            !sp->ignore_eovercrowded) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            break;
        }

        // Switch to service-specific error.
        non_service_error.release();
        method_status = sp->status;
        if (method_status) {
            int rejected_cc = 0;
            if (!method_status->OnRequested(&rejected_cc)) {
                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                sp->method->full_name().c_str(), rejected_cc);
                break;
            }
        }
        
        google::protobuf::Service* svc = sp->service;
        const google::protobuf::MethodDescriptor* method = sp->method;
        accessor.set_method(method);

        if (!server->AcceptRequest(cntl.get())) {
            break;
        }

        if (span) {
            span->ResetServerSpanName(method->full_name());
        }
        const int reqsize = msg->payload.length();
        butil::IOBuf req_buf;
        butil::IOBuf* req_buf_ptr = &msg->payload;
        if (meta.has_user_message_size()) {
            msg->payload.cutn(&req_buf, meta.user_message_size());
            req_buf_ptr = &req_buf;
            cntl->request_attachment().swap(msg->payload);
        }

        req.reset(svc->GetRequestPrototype(method).New());
        if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
            cntl->SetFailed(EREQUEST, "Fail to parse request message, "
                            "CompressType=%s, request_size=%d", 
                            CompressTypeToCStr(req_cmp_type), reqsize);
            break;
        }

        res.reset(svc->GetResponsePrototype(method).New());
        // `socket' will be held until response has been sent
        google::protobuf::Closure* done = ::brpc::NewCallback<
            int64_t, HuluController*, const google::protobuf::Message*,
            const google::protobuf::Message*, const Server*,
                  MethodStatus *, int64_t>(
                &SendHuluResponse, correlation_id, cntl.get(),
                req.get(), res.get(), server,
                method_status, msg->received_us());

        // optional, just release resource ASAP
        msg.reset();
        req_buf.clear();

        if (span) {
            span->set_start_callback_us(butil::cpuwide_time_us());
            span->AsParent();
        }
        if (!FLAGS_usercode_in_pthread) {
            return svc->CallMethod(method, cntl.release(), 
                                   req.release(), res.release(), done);
        }
        if (BeginRunningUserCode()) {
            svc->CallMethod(method, cntl.release(), 
                            req.release(), res.release(), done);
            return EndRunningUserCodeInPlace();
        } else {
            return EndRunningCallMethodInPool(
                svc, method, cntl.release(),
                req.release(), res.release(), done);
        }
    } while (false);

    // `cntl', `req' and `res' will be deleted inside `SendHuluResponse'
    // `socket' will be held until response has been sent
    SendHuluResponse(correlation_id, cntl.release(),
                     req.release(), res.release(), server,
                     method_status, msg->received_us());
}