in src/brpc/policy/ubrpc2pb_protocol.cpp [276:438]
static void ParseResponse(Controller* cntl, butil::IOBuf& buf,
google::protobuf::Message* res) {
if (res == NULL) {
// silently ignore response.
return;
}
const std::string& msg_name = res->GetDescriptor()->full_name();
mcpack2pb::MessageHandler handler = mcpack2pb::find_message_handler(msg_name);
if (handler.parse_body == NULL) {
return cntl->SetFailed(ERESPONSE, "Fail to find parser of %s",
msg_name.c_str());
}
butil::IOBufAsZeroCopyInputStream zc_stream(buf);
mcpack2pb::InputStream stream(&zc_stream);
if (!::mcpack2pb::unbox(&stream)) {
cntl->SetFailed(ERESPONSE, "Response is not a compack/mcpack2 object");
return;
}
mcpack2pb::ObjectIterator it1(&stream, buf.size() - stream.popped_bytes());
bool found_content = false;
for (; it1 != NULL; ++it1) {
if (it1->name == "content") {
found_content = true;
break;
}
}
if (!found_content) {
cntl->SetFailed(ERESPONSE, "Fail to find response.content");
return;
}
if (it1->value.type() != mcpack2pb::FIELD_ARRAY) {
cntl->SetFailed(ERESPONSE, "Expect response.content to be array,"
" actually %s", mcpack2pb::type2str(it1->value.type()));
return;
}
mcpack2pb::ArrayIterator it2(it1->value);
if (it2 == NULL) {
cntl->SetFailed("Fail to parse response.content as array");
return;
}
bool has_result_params = false;
size_t user_res_offset = 0;
size_t user_res_size = 0;
const char* response_name = "result_params";
for (mcpack2pb::ObjectIterator it3(*it2); it3 != NULL; ++it3) {
if (it3->name == "error") {
if (it3->value.type() != mcpack2pb::FIELD_OBJECT) {
cntl->SetFailed(ERESPONSE, "Expect response.content[0].error"
" to be object, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
int32_t code = 0;
std::string msg;
for (mcpack2pb::ObjectIterator it4(it3->value); it4 != NULL; ++it4) {
if (it4->name == "code") {
if (!mcpack2pb::is_primitive(it4->value.type()) ||
!mcpack2pb::is_integral(
(mcpack2pb::PrimitiveFieldType)it4->value.type())) {
cntl->SetFailed(
ERESPONSE, "Expect response.content[0].error.code "
"to be integer, actually %s",
mcpack2pb::type2str(it4->value.type()));
return;
}
code = it4->value.as_int32("response.content[0].error.code");
if (code == 0) {
cntl->SetFailed(ERESPONSE,
"response.content[0].error.code is 0");
return;
}
} else if (it4->name == "message") {
if (it4->value.type() != mcpack2pb::FIELD_STRING) {
cntl->SetFailed(
ERESPONSE, "Expect response.content[0].error."
"message to be string, actually %s",
mcpack2pb::type2str(it4->value.type()));
return;
}
it4->value.as_string(&msg, "response.content[0].error.message");
} // else field "data" (probably non-ASCII) is ignored.
}
if (code == 0) {
cntl->SetFailed(ERESPONSE,
"Fail to find response.content[0].error.code");
return;
}
if (msg.empty()) {
cntl->SetFailed(ERESPONSE,
"Fail to find response.content[0].error.message");
return;
}
cntl->SetFailed(code, "%s", msg.c_str());
return; // no need to parse left fields.
} else if (it3->name == "result") {
if (!mcpack2pb::is_primitive(it3->value.type()) ||
!mcpack2pb::is_integral((mcpack2pb::PrimitiveFieldType)it3->value.type())) {
cntl->SetFailed(ERESPONSE, "Expect response.content[0].result"
" to be integer, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
cntl->set_idl_result(it3->value.as_int64("response.content[0].result"));
} else if (it3->name == "result_params") {
if (it3->value.type() != mcpack2pb::FIELD_OBJECT) {
cntl->SetFailed(ERESPONSE, "Expect response.content[0].result_params"
" to be object, actually %s",
mcpack2pb::type2str(it3->value.type()));
return;
}
has_result_params = true;
user_res_offset = stream.popped_bytes();
user_res_size = it3->value.size();
const size_t stream_end = stream.popped_bytes() + it3->value.size();
const char* const expname = cntl->idl_names().response_name;
if (expname != NULL && *expname) {
mcpack2pb::ObjectIterator it4(it3->value);
bool found_response_name = false;
for (; it4 != NULL; ++it4) {
if (it4->name == expname) {
found_response_name = true;
break;
}
}
if (!found_response_name) {
cntl->SetFailed(ERESPONSE, "Fail to find response."
"content[0].result_params.%s", expname);
return;
}
response_name = expname;
user_res_offset = stream.popped_bytes();
user_res_size = it4->value.size();
}
// Pop left bytes, otherwise ++it3 may complain about
// "not fully consumed".
if (stream_end > stream.popped_bytes()) {
stream.popn(stream_end - stream.popped_bytes());
}
}
}
if (!has_result_params) {
cntl->SetFailed(ERESPONSE,
"Fail to find response.content[0].result_params");
return;
}
buf.pop_front(user_res_offset);
if (buf.size() != user_res_size) {
if (buf.size() < user_res_size) {
cntl->SetFailed(ERESPONSE, "response_size=%" PRIu64 " is shorter "
"than specified=%" PRIu64, (uint64_t)buf.size(),
(uint64_t)user_res_size);
return;
}
buf.pop_back(buf.size() - user_res_size);
}
butil::IOBufAsZeroCopyInputStream bufstream(buf);
if (!handler.parse_body(res, &bufstream, buf.size())) {
cntl->SetFailed(ERESPONSE, "Fail to parse %s from response.content[0]."
"result_params.%s", msg_name.c_str(), response_name);
return;
}
}