in src/brpc/policy/nshead_protocol.cpp [221:355]
void ProcessNsheadRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket_guard(msg->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
char buf[sizeof(nshead_t)];
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
const nshead_t *req_head = (const nshead_t *)p;
NsheadService* service = server->options().nshead_service;
if (service == NULL) {
LOG_EVERY_SECOND(WARNING)
<< "Received nshead request however the server does not set"
" ServerOptions.nshead_service, close the connection.";
socket->SetFailed();
return;
}
void* space = malloc(sizeof(NsheadClosure) + service->_additional_space);
if (!space) {
LOG(FATAL) << "Fail to new NsheadClosure";
socket->SetFailed();
return;
}
// for nshead sample request
SampledRequest* sample = AskToBeSampled();
if (sample) {
sample->meta.set_protocol_type(PROTOCOL_NSHEAD);
sample->meta.set_nshead(p, sizeof(nshead_t)); // nshead
sample->request = msg->payload;
sample->submit(start_parse_us);
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = service->_status;
if (method_status) {
CHECK(method_status->OnRequested());
}
void* sub_space = NULL;
if (service->_additional_space) {
sub_space = (char*)space + sizeof(NsheadClosure);
}
NsheadClosure* nshead_done = new (space) NsheadClosure(sub_space);
Controller* cntl = &(nshead_done->_controller);
NsheadMessage* req = &(nshead_done->_request);
NsheadMessage* res = &(nshead_done->_response);
req->head = *req_head;
msg->payload.swap(req->body);
nshead_done->_received_us = msg->received_us();
nshead_done->_server = server;
ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl);
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
// Initialize log_id with the log_id in nshead. Notice that the protocols
// on top of NsheadService may pack log_id in meta or user messages and
// overwrite the value.
cntl->set_log_id(req_head->log_id);
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_request_protocol(PROTOCOL_NSHEAD)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span);
span->set_log_id(req_head->log_id);
span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_NSHEAD);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_request_size(sizeof(nshead_t) + req_head->body_len);
}
do {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
if (!server->AcceptRequest(cntl)) {
break;
}
} while (false);
msg.reset(); // optional, just release resource ASAP
if (span) {
span->ResetServerSpanName(service->_cached_name);
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return service->ProcessNsheadRequest(*server, cntl, *req, res, nshead_done);
}
if (BeginRunningUserCode()) {
service->ProcessNsheadRequest(*server, cntl, *req, res, nshead_done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
service, *server, cntl, *req, res, nshead_done);
}
}