in src/brpc/policy/mongo_protocol.cpp [173:295]
void ProcessMongoRequest(InputMessageBase* msg_base) {
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket_guard(msg->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
char buf[sizeof(mongo_head_t)];
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
const mongo_head_t *header = (const mongo_head_t*)p;
const google::protobuf::ServiceDescriptor* srv_des = MongoService::descriptor();
if (1 != srv_des->method_count()) {
LOG(WARNING) << "method count:" << srv_des->method_count()
<< " of MongoService should be equal to 1!";
}
const Server::MethodProperty *mp =
ServerPrivateAccessor(server)
.FindMethodPropertyByFullName(srv_des->method(0)->full_name());
MongoContextMessage *context_msg =
dynamic_cast<MongoContextMessage*>(socket->parsing_context());
if (NULL == context_msg) {
LOG(WARNING) << "socket context wasn't set correctly";
return;
}
SendMongoResponse* mongo_done = new SendMongoResponse(server);
mongo_done->cntl.set_mongo_session_data(context_msg->context());
ControllerPrivateAccessor accessor(&(mongo_done->cntl));
accessor.set_server(server)
.set_security_mode(server->options().security_mode())
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_auth_context(socket->auth_context())
.set_request_protocol(PROTOCOL_MONGO)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
// Tag the bthread with this server's key for
// thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
do {
if (!server->IsRunning()) {
mongo_done->cntl.SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
mongo_done->cntl.SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
if (NULL == mp ||
mp->service->GetDescriptor() == BadMethodService::descriptor()) {
mongo_done->cntl.SetFailed(ENOMETHOD, "Fail to find default_method");
break;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = mp->status;
mongo_done->status = method_status;
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
mongo_done->cntl.SetFailed(
ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
if (!MongoOp_IsValid(header->op_code)) {
mongo_done->cntl.SetFailed(EREQUEST, "Unknown op_code:%d", header->op_code);
break;
}
mongo_done->cntl.set_log_id(header->request_id);
const std::string &body_str = msg->payload.to_string();
mongo_done->req.set_message(body_str.c_str(), body_str.size());
mongo_done->req.mutable_header()->set_message_length(header->message_length);
mongo_done->req.mutable_header()->set_request_id(header->request_id);
mongo_done->req.mutable_header()->set_response_to(header->response_to);
mongo_done->req.mutable_header()->set_op_code(
static_cast<MongoOp>(header->op_code));
mongo_done->res.mutable_header()->set_response_to(header->request_id);
mongo_done->received_us = msg->received_us();
google::protobuf::Service* svc = mp->service;
const google::protobuf::MethodDescriptor* method = mp->method;
accessor.set_method(method);
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(
method, &(mongo_done->cntl), &(mongo_done->req),
&(mongo_done->res), mongo_done);
}
if (BeginRunningUserCode()) {
return svc->CallMethod(
method, &(mongo_done->cntl), &(mongo_done->req),
&(mongo_done->res), mongo_done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, &(mongo_done->cntl), &(mongo_done->req),
&(mongo_done->res), mongo_done);
}
} while (false);
mongo_done->Run();
}