in src/brpc/policy/hulu_pbrpc_protocol.cpp [341:547]
void ProcessHuluRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket_guard(msg->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
HuluRpcRequestMeta meta;
if (!ParsePbFromIOBuf(&meta, msg->meta)) {
LOG(WARNING) << "Fail to parse HuluRpcRequestMeta, close the connection";
socket->SetFailed();
return;
}
const CompressType req_cmp_type = Hulu2CompressType((HuluCompressType)meta.compress_type());
SampledRequest* sample = AskToBeSampled();
if (sample) {
sample->meta.set_service_name(meta.service_name());
sample->meta.set_method_index(meta.method_index());
sample->meta.set_compress_type(req_cmp_type);
sample->meta.set_protocol_type(PROTOCOL_HULU_PBRPC);
sample->meta.set_user_data(meta.user_data());
if (meta.has_user_message_size()
&& static_cast<size_t>(meta.user_message_size()) < msg->payload.size()) {
size_t attachment_size = msg->payload.size() - meta.user_message_size();
sample->meta.set_attachment_size(attachment_size);
}
sample->request = msg->payload;
sample->submit(start_parse_us);
}
std::unique_ptr<HuluController> cntl(new (std::nothrow) HuluController());
if (NULL == cntl.get()) {
LOG(WARNING) << "Fail to new Controller";
return;
}
std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;
ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl.get());
int64_t correlation_id = meta.correlation_id();
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
if (meta.has_log_id()) {
cntl->set_log_id(meta.log_id());
}
cntl->set_request_compress_type(req_cmp_type);
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_auth_context(socket->auth_context())
.set_request_protocol(PROTOCOL_HULU_PBRPC)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
if (meta.has_user_data()) {
cntl->set_request_user_data(meta.user_data());
}
if (meta.has_user_defined_source_addr()) {
cntl->set_request_source_addr(meta.user_defined_source_addr());
}
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
if (IsTraceable(meta.has_trace_id())) {
span = Span::CreateServerSpan(
meta.trace_id(), meta.span_id(), meta.parent_span_id(),
msg->base_real_us());
accessor.set_span(span);
span->set_log_id(meta.log_id());
span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_HULU_PBRPC);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_request_size(msg->payload.size() + msg->meta.size() + 12);
}
MethodStatus* method_status = NULL;
do {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
const Server::MethodProperty *sp =
server_accessor.FindMethodPropertyByNameAndIndex(
meta.service_name(), meta.method_index());
if (NULL == sp) {
cntl->SetFailed(ENOMETHOD, "Fail to find method=%d of service=%s",
meta.method_index(), meta.service_name().c_str());
break;
} else if (sp->service->GetDescriptor()
== BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
breq.set_service_name(meta.service_name());
sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!sp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = sp->status;
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
break;
}
}
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
if (!server->AcceptRequest(cntl.get())) {
break;
}
if (span) {
span->ResetServerSpanName(method->full_name());
}
const int reqsize = msg->payload.length();
butil::IOBuf req_buf;
butil::IOBuf* req_buf_ptr = &msg->payload;
if (meta.has_user_message_size()) {
msg->payload.cutn(&req_buf, meta.user_message_size());
req_buf_ptr = &req_buf;
cntl->request_attachment().swap(msg->payload);
}
req.reset(svc->GetRequestPrototype(method).New());
if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
cntl->SetFailed(EREQUEST, "Fail to parse request message, "
"CompressType=%s, request_size=%d",
CompressTypeToCStr(req_cmp_type), reqsize);
break;
}
res.reset(svc->GetResponsePrototype(method).New());
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, HuluController*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, int64_t>(
&SendHuluResponse, correlation_id, cntl.get(),
req.get(), res.get(), server,
method_status, msg->received_us());
// optional, just release resource ASAP
msg.reset();
req_buf.clear();
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, cntl.release(),
req.release(), res.release(), done);
}
} while (false);
// `cntl', `req' and `res' will be deleted inside `SendHuluResponse'
// `socket' will be held until response has been sent
SendHuluResponse(correlation_id, cntl.release(),
req.release(), res.release(), server,
method_status, msg->received_us());
}