in src/brpc/policy/http_rpc_protocol.cpp [1403:1720]
void ProcessHttpRequest(InputMessageBase *msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg->arg());
ScopedNonServiceError non_service_error(server);
Controller* cntl = new (std::nothrow) Controller;
if (NULL == cntl) {
LOG(FATAL) << "Fail to new Controller";
return;
}
HttpResponseSender resp_sender(cntl);
resp_sender.set_received_us(msg->received_us());
const bool is_http2 = imsg_guard->header().is_http2();
if (is_http2) {
H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
resp_sender.set_h2_stream_id(h2_sctx->stream_id());
}
ControllerPrivateAccessor accessor(cntl);
HttpHeader& req_header = cntl->http_request();
imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body();
butil::EndPoint user_addr;
if (!GetUserAddressFromHeader(req_header, &user_addr)) {
user_addr = socket->remote_side();
}
ServerPrivateAccessor server_accessor(server);
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(user_addr)
.set_local_side(socket->local_side())
.set_auth_context(socket->auth_context())
.set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
// Read log-id. errno may be set when input to strtoull overflows.
// atoi/atol/atoll don't support 64-bit integer and can't be used.
const std::string* log_id_str = req_header.GetHeader(common->LOG_ID);
if (log_id_str) {
char* logid_end = NULL;
errno = 0;
uint64_t logid = strtoull(log_id_str->c_str(), &logid_end, 10);
if (*logid_end || errno) {
LOG(ERROR) << "Invalid " << common->LOG_ID << '='
<< *log_id_str << " in http request";
} else {
cntl->set_log_id(logid);
}
}
const std::string* request_id = req_header.GetHeader(FLAGS_request_id_header);
if (request_id) {
cntl->set_request_id(*request_id);
}
// Tag the bthread with this server's key for
// thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
const std::string& path = req_header.uri().path();
const std::string* trace_id_str = req_header.GetHeader("x-bd-trace-id");
if (IsTraceable(trace_id_str)) {
uint64_t trace_id = 0;
if (trace_id_str) {
trace_id = strtoull(trace_id_str->c_str(), NULL, 10);
}
uint64_t span_id = 0;
const std::string* span_id_str = req_header.GetHeader("x-bd-span-id");
if (span_id_str) {
span_id = strtoull(span_id_str->c_str(), NULL, 10);
}
uint64_t parent_span_id = 0;
const std::string* parent_span_id_str =
req_header.GetHeader("x-bd-parent-span-id");
if (parent_span_id_str) {
parent_span_id = strtoull(parent_span_id_str->c_str(), NULL, 10);
}
span = Span::CreateServerSpan(
path, trace_id, span_id, parent_span_id, msg->base_real_us());
accessor.set_span(span);
span->set_log_id(cntl->log_id());
span->set_remote_side(user_addr);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP);
span->set_request_size(imsg_guard->parsed_length());
}
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
return;
}
if (server->options().http_master_service) {
// If http_master_service is on, just call it.
google::protobuf::Service* svc = server->options().http_master_service;
const google::protobuf::MethodDescriptor* md =
svc->GetDescriptor()->FindMethodByName(common->DEFAULT_METHOD);
if (md == NULL) {
cntl->SetFailed(ENOMETHOD, "No default_method in http_master_service");
return;
}
accessor.set_method(md);
cntl->request_attachment().swap(req_body);
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
if (span) {
span->ResetServerSpanName(md->full_name());
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
// `cntl', `req' and `res' will be deleted inside `done'
return svc->CallMethod(md, cntl, NULL, NULL, done);
}
const Server::MethodProperty* const mp =
FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
if (NULL == mp) {
if (security_mode) {
std::string escape_path;
WebEscape(path, &escape_path);
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());
} else {
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
}
return;
} else if (mp->service->GetDescriptor() == BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
butil::StringSplitter split(path.c_str(), '/');
breq.set_service_name(std::string(split.field(), split.length()));
mp->service->CallMethod(mp->method, cntl, &breq, &bres, NULL);
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = mp->status;
resp_sender.set_method_status(method_status);
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
return;
}
}
if (span) {
span->ResetServerSpanName(mp->method->full_name());
}
// NOTE: accesses to builtin services are not counted as part of
// concurrency, therefore are not limited by ServerOptions.max_concurrency.
if (!mp->is_builtin_service && !mp->params.is_tabbed) {
if (socket->is_overcrowded() &&
!server->options().ignore_eovercrowded &&
!mp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return;
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
return;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
return;
}
if (!server->AcceptRequest(cntl)) {
return;
}
} else if (security_mode) {
cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
"ServerOptions.internal_port=%d instead if you're in"
" internal network", server->options().internal_port);
return;
}
google::protobuf::Service* svc = mp->service;
const google::protobuf::MethodDescriptor* method = mp->method;
accessor.set_method(method);
RpcPBMessages* messages = server->options().rpc_pb_message_factory->Get(*svc, *method);;
resp_sender.set_messages(messages);
google::protobuf::Message* req = messages->Request();
google::protobuf::Message* res = messages->Response();
if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
cntl->SetFailed("Fail to new req or res");
return;
}
if (mp->params.allow_http_body_to_pb &&
method->input_type()->field_count() > 0) {
// A protobuf service. No matter if Content-type is set to
// applcation/json or body is empty, we have to treat body as a json
// and try to convert it to pb, which guarantees that a protobuf
// service is always accessed with valid requests.
if (req_body.empty()) {
// Treat empty body specially since parsing it results in error
if (!req->IsInitialized()) {
cntl->SetFailed(EREQUEST, "%s needs to be created from a"
" non-empty json, it has required fields.",
req->GetDescriptor()->full_name().c_str());
return;
} // else all fields of the request are optional.
} else {
bool is_grpc_ct = false;
const HttpContentType content_type =
ParseContentType(req_header.content_type(), &is_grpc_ct);
const std::string* encoding = NULL;
if (is_http2 && is_grpc_ct) {
bool grpc_compressed = false;
if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {
cntl->SetFailed(EREQUEST, "Invalid gRPC request");
return;
}
if (grpc_compressed) {
encoding = req_header.GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(
EREQUEST, "Fail to find header `grpc-encoding'"
" in compressed gRPC request");
return;
}
}
int64_t timeout_value_us =
ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
if (timeout_value_us >= 0) {
accessor.set_deadline_us(
butil::gettimeofday_us() + timeout_value_us);
}
} else { // http or h2 but not grpc
encoding = req_header.GetHeader(common->CONTENT_ENCODING);
}
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size());
butil::IOBuf uncompressed;
if (!policy::GzipDecompress(req_body, &uncompressed)) {
cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
return;
}
req_body.swap(uncompressed);
}
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
return;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http proto-text body as %s",
req->GetDescriptor()->full_name().c_str());
return;
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
return;
}
} else {
cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64);
cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array);
if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
return;
}
}
}
if (!is_http2) {
SampledRequest* sample = AskToBeSampled();
if (sample) {
sample->meta.set_compress_type(COMPRESS_TYPE_NONE);
sample->meta.set_protocol_type(PROTOCOL_HTTP);
sample->meta.set_attachment_size(req_body.size());
butil::EndPoint ep;
MakeRawHttpRequest(&sample->request, &req_header, ep, &req_body);
sample->submit(start_parse_us);
}
}
} else {
if (imsg_guard->read_body_progressively()) {
accessor.set_readable_progressive_attachment(imsg_guard.get());
} else {
// A http server, just keep content as it is.
cntl->request_attachment().swap(req_body);
}
}
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
imsg_guard.reset(); // optional, just release resource ASAP
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl, req, res, done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl, req, res, done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
}
}