in src/brpc/policy/http_rpc_protocol.cpp [345:561]
void ProcessHttpResponse(InputMessageBase* msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
Socket* socket = imsg_guard->socket();
uint64_t cid_value;
const bool is_http2 = imsg_guard->header().is_http2();
if (is_http2) {
H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
cid_value = h2_sctx->correlation_id();
} else {
cid_value = socket->correlation_id();
}
if (cid_value == 0) {
LOG(WARNING) << "Fail to find correlation_id from " << *socket;
return;
}
const bthread_id_t cid = { cid_value };
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
// TODO: changing when imsg_guard->read_body_progressively() is true
span->set_response_size(imsg_guard->parsed_length());
span->set_start_parse_us(start_parse_us);
}
HttpHeader* res_header = &cntl->http_response();
res_header->Swap(imsg_guard->header());
butil::IOBuf& res_body = imsg_guard->body();
CHECK(cntl->response_attachment().empty());
const int saved_error = cntl->ErrorCode();
bool is_grpc_ct = false;
const HttpContentType content_type =
ParseContentType(res_header->content_type(), &is_grpc_ct);
const bool is_grpc = (is_http2 && is_grpc_ct);
bool grpc_compressed = false; // only valid when is_grpc is true.
do {
if (!is_http2) {
// If header has "Connection: close", close the connection.
const std::string* conn_cmd = res_header->GetHeader(common->CONNECTION);
if (conn_cmd != NULL && 0 == strcasecmp(conn_cmd->c_str(), "close")) {
// Server asked to close the connection.
if (imsg_guard->read_body_progressively()) {
// Close the socket when reading completes.
socket->read_will_be_progressive(CONNECTION_TYPE_SHORT);
} else {
socket->SetFailed();
}
}
} else if (is_grpc) {
if (!RemoveGrpcPrefix(&res_body, &grpc_compressed)) {
cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
break;
}
const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
if (grpc_status) {
// TODO: More strict parsing
GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
if (status != GRPC_OK) {
const std::string* grpc_message =
res_header->GetHeader(common->GRPC_MESSAGE);
if (grpc_message) {
std::string message_decoded;
PercentDecode(*grpc_message, &message_decoded);
cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
message_decoded.c_str());
} else {
cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
GrpcStatusToString(status));
}
break;
}
}
}
if (imsg_guard->read_body_progressively()) {
// Set RPA if needed
accessor.set_readable_progressive_attachment(imsg_guard.get());
const int sc = res_header->status_code();
if (sc < 200 || sc >= 300) {
// Even if the body is for streaming purpose, a non-OK status
// code indicates that the body is probably the error text
// which is helpful for debugging.
// content may be binary data, so the size limit is a must.
std::string body_str;
res_body.copy_to(
&body_str, std::min((int)res_body.size(),
FLAGS_http_max_error_length));
cntl->SetFailed(EHTTP, "HTTP/%d.%d %d %s: %.*s",
res_header->major_version(),
res_header->minor_version(),
static_cast<int>(res_header->status_code()),
res_header->reason_phrase(),
(int)body_str.size(), body_str.c_str());
} else if (cntl->response() != NULL &&
cntl->response()->GetDescriptor()->field_count() != 0) {
cntl->SetFailed(ERESPONSE, "A protobuf response can't be parsed"
" from progressively-read HTTP body");
}
break;
}
// Fail RPC if status code is an error in http sense.
// ErrorCode of RPC is unified to EHTTP.
const int sc = res_header->status_code();
if (sc < 200 || sc >= 300) {
std::string err = butil::string_printf(
"HTTP/%d.%d %d %s",
res_header->major_version(),
res_header->minor_version(),
static_cast<int>(res_header->status_code()),
res_header->reason_phrase());
if (!res_body.empty()) {
// Use content as error text if it's present. Notice that
// content may be binary data, so the size limit is a must.
err.append(": ");
res_body.append_to(
&err, std::min((int)res_body.size(),
FLAGS_http_max_error_length));
}
// If server return brpc error code by x-bd-error-code,
// set the returned error code to controller. Otherwise,
// set EHTTP to controller uniformly.
const std::string* error_code_ptr = res_header->GetHeader(common->ERROR_CODE);
int error_code = error_code_ptr ? strtol(error_code_ptr->data(), NULL, 10) : 0;
if (FLAGS_use_http_error_code && error_code != 0) {
cntl->SetFailed(error_code, "%s", err.c_str());
} else {
cntl->SetFailed(EHTTP, "%s", err.c_str());
}
if (cntl->response() == NULL ||
cntl->response()->GetDescriptor()->field_count() == 0) {
// A http call. Http users may need the body(containing a html,
// json etc) even if the http call was failed. This is different
// from protobuf services where responses are undefined when RPC
// was failed.
cntl->response_attachment().swap(res_body);
}
break;
}
if (cntl->response() == NULL ||
cntl->response()->GetDescriptor()->field_count() == 0) {
// a http call, content is the "real response".
cntl->response_attachment().swap(res_body);
break;
}
const std::string* encoding = NULL;
if (is_grpc) {
if (grpc_compressed) {
encoding = res_header->GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding' "
"in compressed gRPC response");
break;
}
}
} else {
encoding = res_header->GetHeader(common->CONTENT_ENCODING);
}
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing response=%lu",
(unsigned long)res_body.size());
butil::IOBuf uncompressed;
if (!policy::GzipDecompress(res_body, &uncompressed)) {
cntl->SetFailed(ERESPONSE, "Fail to un-gzip response body");
break;
}
res_body.swap(uncompressed);
}
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_JSON) {
// Message body is json.
if (!JsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
// Message body is json.
if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
break;
}
} else {
cntl->SetFailed(ERESPONSE,
"Unknown content-type=%s when response is not NULL",
res_header->content_type().c_str());
break;
}
} while (0);
// Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails
imsg_guard.reset();
accessor.OnResponse(cid, saved_error);
}