void ProcessNsheadRequest()

in src/brpc/policy/nshead_protocol.cpp [221:355]


void ProcessNsheadRequest(InputMessageBase* msg_base) {
    const int64_t start_parse_us = butil::cpuwide_time_us();   

    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    SocketUniquePtr socket_guard(msg->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg_base->arg());
    ScopedNonServiceError non_service_error(server);
    
    char buf[sizeof(nshead_t)];
    const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
    const nshead_t *req_head = (const nshead_t *)p;

    NsheadService* service = server->options().nshead_service;
    if (service == NULL) {
        LOG_EVERY_SECOND(WARNING) 
            << "Received nshead request however the server does not set"
            " ServerOptions.nshead_service, close the connection.";
        socket->SetFailed();
        return;
    }
    void* space = malloc(sizeof(NsheadClosure) + service->_additional_space);
    if (!space) {
        LOG(FATAL) << "Fail to new NsheadClosure";
        socket->SetFailed();
        return;
    }

    // for nshead sample request
    SampledRequest* sample = AskToBeSampled();
    if (sample) {
        sample->meta.set_protocol_type(PROTOCOL_NSHEAD);
        sample->meta.set_nshead(p, sizeof(nshead_t)); // nshead
        sample->request = msg->payload;
        sample->submit(start_parse_us);
    }

    // Switch to service-specific error.
    non_service_error.release();
    MethodStatus* method_status = service->_status;
    if (method_status) {
        CHECK(method_status->OnRequested());
    }
    
    void* sub_space = NULL;
    if (service->_additional_space) {
        sub_space = (char*)space + sizeof(NsheadClosure);
    }
    NsheadClosure* nshead_done = new (space) NsheadClosure(sub_space);
    Controller* cntl = &(nshead_done->_controller);
    NsheadMessage* req = &(nshead_done->_request);
    NsheadMessage* res = &(nshead_done->_response);

    req->head = *req_head;
    msg->payload.swap(req->body);
    nshead_done->_received_us = msg->received_us();
    nshead_done->_server = server;
    
    ServerPrivateAccessor server_accessor(server);
    ControllerPrivateAccessor accessor(cntl);
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    // Initialize log_id with the log_id in nshead. Notice that the protocols
    // on top of NsheadService may pack log_id in meta or user messages and
    // overwrite the value.
    cntl->set_log_id(req_head->log_id);
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(socket->remote_side())
        .set_local_side(socket->local_side())
        .set_request_protocol(PROTOCOL_NSHEAD)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);

    // Tag the bthread with this server's key for thread_local_data().
    if (server->thread_local_options().thread_local_data_factory) {
        bthread_assign_data((void*)&server->thread_local_options());
    }

    Span* span = NULL;
    if (IsTraceable(false)) {
        span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
        accessor.set_span(span);
        span->set_log_id(req_head->log_id);
        span->set_remote_side(cntl->remote_side());
        span->set_protocol(PROTOCOL_NSHEAD);
        span->set_received_us(msg->received_us());
        span->set_start_parse_us(start_parse_us);
        span->set_request_size(sizeof(nshead_t) + req_head->body_len);
    }

    do {
        if (!server->IsRunning()) {
            cntl->SetFailed(ELOGOFF, "Server is stopping");
            break;
        }
        if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            break;
        }
        if (!server_accessor.AddConcurrency(cntl)) {
            cntl->SetFailed(
                ELIMIT, "Reached server's max_concurrency=%d",
                server->options().max_concurrency);
            break;
        }
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            break;
        }
        if (!server->AcceptRequest(cntl)) {
            break;
        }
    } while (false);

    msg.reset();  // optional, just release resource ASAP
    if (span) {
        span->ResetServerSpanName(service->_cached_name);
        span->set_start_callback_us(butil::cpuwide_time_us());
        span->AsParent();
    }
    if (!FLAGS_usercode_in_pthread) {
        return service->ProcessNsheadRequest(*server, cntl, *req, res, nshead_done);
    }
    if (BeginRunningUserCode()) {
        service->ProcessNsheadRequest(*server, cntl, *req, res, nshead_done);
        return EndRunningUserCodeInPlace();
    } else {
        return EndRunningCallMethodInPool(
            service, *server, cntl, *req, res, nshead_done);
    }
}