in src/brpc/policy/ubrpc2pb_protocol.cpp [45:167]
void UbrpcAdaptor::ParseNsheadMeta(
const Server&, const NsheadMessage& request, Controller* cntl,
NsheadMeta* out_meta) const {
butil::IOBufAsZeroCopyInputStream zc_stream(request.body);
mcpack2pb::InputStream stream(&zc_stream);
if (!::mcpack2pb::unbox(&stream)) {
cntl->SetFailed(EREQUEST, "Request is not a compack/mcpack2 object");
return;
}
mcpack2pb::ObjectIterator it1(&stream, request.body.size() - stream.popped_bytes());
bool found_content = false;
for (; it1 != NULL; ++it1) {
if (it1->name == "content") {
found_content = true;
break;
}
}
if (!found_content) {
cntl->SetFailed(EREQUEST, "Fail to find request.content");
return;
}
if (it1->value.type() != mcpack2pb::FIELD_ARRAY) {
cntl->SetFailed(EREQUEST, "Expect request.content to be array, "
"actually %s", mcpack2pb::type2str(it1->value.type()));
return;
}
mcpack2pb::ArrayIterator it2(it1->value);
if (it2 == NULL) {
cntl->SetFailed(EREQUEST, "Fail to parse request.content as array");
return;
}
std::string service_name;
std::string method_name;
bool has_params = false;
size_t user_req_offset = 0;
size_t user_req_size = 0;
for (mcpack2pb::ObjectIterator it3(*it2); it3 != NULL; ++it3) {
if (it3->name == "service_name") {
if (it3->value.type() != mcpack2pb::FIELD_STRING) {
cntl->SetFailed(EREQUEST, "Expect request.content[0].service_name"
" to be string, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
it3->value.as_string(&service_name, "request.content[0].service_name");
} else if (it3->name == "method") {
if (it3->value.type() != mcpack2pb::FIELD_STRING) {
cntl->SetFailed(EREQUEST, "Expect request.content[0].method"
" to be string, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
it3->value.as_string(&method_name, "request.content[0].method");
} else if (it3->name == "id") {
if (!mcpack2pb::is_primitive(it3->value.type()) ||
!mcpack2pb::is_integral((mcpack2pb::PrimitiveFieldType)it3->value.type())) {
cntl->SetFailed(ERESPONSE, "request.content[0].id must be "
"integer, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
out_meta->set_correlation_id(
it3->value.as_int64("request.content[0].id"));
} else if (it3->name == "params") {
if (it3->value.type() != mcpack2pb::FIELD_OBJECT) {
cntl->SetFailed(EREQUEST, "Expect request.content[0].params "
"to be object, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
has_params = true;
user_req_offset = stream.popped_bytes();
user_req_size = it3->value.size();
const size_t stream_end = stream.popped_bytes() + it3->value.size();
mcpack2pb::ObjectIterator it4(it3->value);
if (it4 == NULL || it4.field_count() == 0) {
cntl->SetFailed(EREQUEST, "Nothing in request.content[0].params");
return;
}
if (it4.field_count() == 1) {
user_req_offset = stream.popped_bytes();
user_req_size = it4->value.size();
}
// Pop left bytes, otherwise ++it3 may complain about
// "not fully consumed".
if (stream_end > stream.popped_bytes()) {
stream.popn(stream_end - stream.popped_bytes());
}
}
}
if (service_name.empty()) {
cntl->SetFailed(EREQUEST, "Fail to find request.content[0].service_name");
return;
}
if (method_name.empty()) {
cntl->SetFailed(EREQUEST, "Fail to find request.content[0].method");
return;
}
if (!has_params) {
cntl->SetFailed(EREQUEST, "Fail to find request.content[0].params");
return;
}
// Change request.body with the user's request.
butil::IOBuf& buf = const_cast<butil::IOBuf&>(request.body);
buf.pop_front(user_req_offset);
if (buf.size() != user_req_size) {
if (buf.size() < user_req_size) {
cntl->SetFailed(EREQUEST, "request_size=%" PRIu64 " is shorter than"
"specified=%" PRIu64, (uint64_t)buf.size(),
(uint64_t)user_req_size);
return;
}
buf.pop_back(buf.size() - user_req_size);
}
std::string full_method_name;
full_method_name.reserve(service_name.size() + 1 + method_name.size());
full_method_name.append(service_name);
full_method_name.push_back('.');
full_method_name.append(method_name);
out_meta->set_full_method_name(full_method_name);
}