in src/brpc/policy/baidu_rpc_protocol.cpp [552:839]
void ProcessRpcRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket_guard(msg->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
RpcMeta meta;
if (!ParsePbFromIOBuf(&meta, msg->meta)) {
LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;
socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",
socket->description().c_str());
return;
}
const RpcRequestMeta &request_meta = meta.request();
SampledRequest* sample = AskToBeSampled();
if (sample) {
sample->meta.set_service_name(request_meta.service_name());
sample->meta.set_method_name(request_meta.method_name());
sample->meta.set_compress_type((CompressType)meta.compress_type());
sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);
sample->meta.set_attachment_size(meta.attachment_size());
sample->meta.set_authentication_data(meta.authentication_data());
sample->request = msg->payload;
sample->submit(start_parse_us);
}
std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
if (NULL == cntl.get()) {
LOG(WARNING) << "Fail to new Controller";
return;
}
RpcPBMessages* messages = NULL;
ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl.get());
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
if (request_meta.has_log_id()) {
cntl->set_log_id(request_meta.log_id());
}
if (request_meta.has_request_id()) {
cntl->set_request_id(request_meta.request_id());
}
if (request_meta.has_timeout_ms()) {
cntl->set_timeout_ms(request_meta.timeout_ms());
}
cntl->set_request_content_type(meta.content_type());
cntl->set_request_compress_type((CompressType)meta.compress_type());
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_auth_context(socket->auth_context())
.set_request_protocol(PROTOCOL_BAIDU_STD)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
if (meta.has_stream_settings()) {
accessor.set_remote_stream_settings(meta.release_stream_settings());
}
if (!meta.user_fields().empty()) {
for (const auto& it : meta.user_fields()) {
(*cntl->request_user_fields())[it.first] = it.second;
}
}
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
if (IsTraceable(request_meta.has_trace_id())) {
span = Span::CreateServerSpan(
request_meta.trace_id(), request_meta.span_id(),
request_meta.parent_span_id(), msg->base_real_us());
accessor.set_span(span);
span->set_log_id(request_meta.log_id());
span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_BAIDU_STD);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_request_size(msg->payload.size() + msg->meta.size() + 12);
}
MethodStatus* method_status = NULL;
do {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
const int req_size = static_cast<int>(msg->payload.size());
if (meta.has_attachment_size()) {
if (req_size < meta.attachment_size()) {
cntl->SetFailed(EREQUEST,
"attachment_size=%d is larger than request_size=%d",
meta.attachment_size(), req_size);
break;
}
}
google::protobuf::Service* svc = NULL;
google::protobuf::MethodDescriptor* method = NULL;
if (NULL != server->options().baidu_master_service) {
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!server->options().baidu_master_service->ignore_eovercrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
svc = server->options().baidu_master_service;
auto sampled_request = new (std::nothrow) SampledRequest;
if (NULL == sampled_request) {
cntl->SetFailed(ENOMEM, "Fail to get sampled_request");
break;
}
sampled_request->meta.set_service_name(request_meta.service_name());
sampled_request->meta.set_method_name(request_meta.method_name());
cntl->reset_sampled_request(sampled_request);
// Switch to service-specific error.
non_service_error.release();
method_status = server->options().baidu_master_service->_status;
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
cntl->SetFailed(
ELIMIT,
"Rejected by %s's ConcurrencyLimiter, concurrency=%d",
butil::class_name<BaiduMasterService>(), rejected_cc);
break;
}
}
if (span) {
span->ResetServerSpanName(sampled_request->meta.method_name());
}
messages = BaiduProxyPBMessages::Get();
msg->payload.cutn(
&((SerializedRequest*)messages->Request())->serialized_data(),
req_size - meta.attachment_size());
if (!msg->payload.empty()) {
cntl->request_attachment().swap(msg->payload);
}
} else {
// NOTE(gejun): jprotobuf sends service names without packages. So the
// name should be changed to full when it's not.
butil::StringPiece svc_name(request_meta.service_name());
if (svc_name.find('.') == butil::StringPiece::npos) {
const Server::ServiceProperty* sp =
server_accessor.FindServicePropertyByName(svc_name);
if (NULL == sp) {
cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
request_meta.service_name().c_str());
break;
}
svc_name = sp->service->GetDescriptor()->full_name();
}
const Server::MethodProperty* mp =
server_accessor.FindMethodPropertyByFullName(
svc_name, request_meta.method_name());
if (NULL == mp) {
cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
request_meta.service_name().c_str(),
request_meta.method_name().c_str());
break;
} else if (mp->service->GetDescriptor() == BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
breq.set_service_name(request_meta.service_name());
mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!mp->ignore_eovercrowded) {
cntl->SetFailed(
EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = mp->status;
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
cntl->SetFailed(
ELIMIT,
"Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
svc = mp->service;
method = const_cast<google::protobuf::MethodDescriptor*>(mp->method);
accessor.set_method(method);
if (span) {
span->ResetServerSpanName(method->full_name());
}
if (!server->AcceptRequest(cntl.get())) {
break;
}
butil::IOBuf req_buf;
int body_without_attachment_size = req_size - meta.attachment_size();
msg->payload.cutn(&req_buf, body_without_attachment_size);
if (meta.attachment_size() > 0) {
cntl->request_attachment().swap(msg->payload);
}
ContentType content_type = meta.content_type();
auto compress_type = static_cast<CompressType>(meta.compress_type());
messages = server->options().rpc_pb_message_factory->Get(*svc, *method);
if (!DeserializeRpcMessage(req_buf, *cntl, content_type,
compress_type, messages->Request())) {
cntl->SetFailed(
EREQUEST, "Fail to parse request=%s, ContentType=%s, "
"CompressType=%s, request_size=%d",
messages->Request()->GetDescriptor()->full_name().c_str(),
ContentTypeToCStr(content_type),
CompressTypeToCStr(compress_type), req_size);
break;
}
req_buf.clear();
}
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, RpcPBMessages*,
const Server*, MethodStatus*, int64_t>(
&SendRpcResponse, meta.correlation_id(),cntl.get(),
messages, server, method_status, msg->received_us());
// optional, just release resource ASAP
msg.reset();
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl.release(),
messages->Request(),
messages->Response(), done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl.release(),
messages->Request(),
messages->Response(), done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, cntl.release(),
messages->Request(),
messages->Response(), done);
}
} while (false);
// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(),
cntl.release(), messages,
server, method_status,
msg->received_us());
}