in src/brpc/policy/http_rpc_protocol.cpp [563:729]
void SerializeHttpRequest(butil::IOBuf* /*not used*/,
Controller* cntl,
const google::protobuf::Message* pbreq) {
HttpHeader& hreq = cntl->http_request();
const bool is_http2 = (cntl->request_protocol() == PROTOCOL_H2);
bool is_grpc = false;
ControllerPrivateAccessor accessor(cntl);
if (!accessor.protocol_param().empty() && hreq.content_type().empty()) {
const std::string& param = accessor.protocol_param();
if (param.find('/') == std::string::npos) {
std::string& s = hreq.mutable_content_type();
s.reserve(12 + param.size());
s.append("application/");
s.append(param);
} else {
hreq.set_content_type(param);
}
}
if (pbreq != NULL) {
// If request is not NULL, message body will be serialized proto/json,
if (!pbreq->IsInitialized()) {
return cntl->SetFailed(
EREQUEST, "Missing required fields in request: %s",
pbreq->InitializationErrorString().c_str());
}
if (!cntl->request_attachment().empty()) {
return cntl->SetFailed(EREQUEST, "request_attachment must be empty "
"when request is not NULL");
}
HttpContentType content_type = HTTP_CONTENT_OTHERS;
if (hreq.content_type().empty()) {
// Set content-type if user did not.
// Note that http1.x defaults to json and h2 defaults to pb.
if (is_http2) {
content_type = HTTP_CONTENT_PROTO;
hreq.set_content_type(common->CONTENT_TYPE_PROTO);
} else {
content_type = HTTP_CONTENT_JSON;
hreq.set_content_type(common->CONTENT_TYPE_JSON);
}
} else {
bool is_grpc_ct = false;
content_type = ParseContentType(hreq.content_type(), &is_grpc_ct);
is_grpc = (is_http2 && is_grpc_ct);
}
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment());
if (content_type == HTTP_CONTENT_PROTO) {
// Serialize content as protobuf
if (!pbreq->SerializeToZeroCopyStream(&wrapper)) {
cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to serialize %s",
pbreq->GetTypeName().c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!google::protobuf::TextFormat::Print(*pbreq, &wrapper)) {
cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to print %s as proto-text",
pbreq->GetTypeName().c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) {
cntl->request_attachment().clear();
return;
}
} else if (content_type == HTTP_CONTENT_JSON) {
if (!ProtoMessageToJson(*pbreq, &wrapper, cntl, EREQUEST)) {
cntl->request_attachment().clear();
return;
}
} else {
return cntl->SetFailed(
EREQUEST, "Cannot serialize pb request according to content_type=%s",
hreq.content_type().c_str());
}
} else {
// Use request_attachment.
// TODO: Checking required fields of http header.
}
// Make RPC fail if uri() is not OK (previous SetHttpURL/operator= failed)
if (!hreq.uri().status().ok()) {
return cntl->SetFailed(EREQUEST, "%s",
hreq.uri().status().error_cstr());
}
bool grpc_compressed = false;
if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) {
if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) {
return cntl->SetFailed(EREQUEST, "http does not support %s",
CompressTypeToCStr(cntl->request_compress_type()));
}
const size_t request_size = cntl->request_attachment().size();
if (request_size >= (size_t)FLAGS_http_body_compress_threshold) {
TRACEPRINTF("Compressing request=%lu", (unsigned long)request_size);
butil::IOBuf compressed;
if (GzipCompress(cntl->request_attachment(), &compressed, NULL)) {
cntl->request_attachment().swap(compressed);
if (is_grpc) {
grpc_compressed = true;
hreq.SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
hreq.SetHeader(common->CONTENT_ENCODING, common->GZIP);
}
} else {
cntl->SetFailed("Fail to gzip the request body, skip compressing");
}
}
}
// Fill log-id if user set it.
if (cntl->has_log_id()) {
hreq.SetHeader(common->LOG_ID,
butil::string_printf("%llu", (unsigned long long)cntl->log_id()));
}
if (!cntl->request_id().empty()) {
hreq.SetHeader(FLAGS_request_id_header, cntl->request_id());
}
if (!is_http2) {
// HTTP before 1.1 needs to set keep-alive explicitly.
if (hreq.before_http_1_1() &&
cntl->connection_type() != CONNECTION_TYPE_SHORT &&
hreq.GetHeader(common->CONNECTION) == NULL) {
hreq.SetHeader(common->CONNECTION, common->KEEP_ALIVE);
}
} else {
cntl->set_stream_creator(get_h2_global_stream_creator());
if (is_grpc) {
/*
hreq.SetHeader(common->GRPC_ACCEPT_ENCODING,
common->GRPC_ACCEPT_ENCODING_VALUE);
*/
// TODO: do we need this?
hreq.SetHeader(common->TE, common->TRAILERS);
if (cntl->timeout_ms() >= 0) {
hreq.SetHeader(common->GRPC_TIMEOUT,
butil::string_printf("%" PRId64 "m", cntl->timeout_ms()));
}
// Append compressed and length before body
AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed);
}
}
// Set url to /ServiceName/MethodName when we're about to call protobuf
// services (indicated by non-NULL method).
const google::protobuf::MethodDescriptor* method = cntl->method();
if (method != NULL) {
hreq.set_method(HTTP_METHOD_POST);
std::string path;
path.reserve(2 + method->service()->full_name().size()
+ method->name().size());
path.push_back('/');
path.append(method->service()->full_name());
path.push_back('/');
path.append(method->name());
hreq.uri().set_path(path);
}
Span* span = accessor.span();
if (span) {
hreq.SetHeader("x-bd-trace-id", butil::string_printf(
"%llu", (unsigned long long)span->trace_id()));
hreq.SetHeader("x-bd-span-id", butil::string_printf(
"%llu", (unsigned long long)span->span_id()));
hreq.SetHeader("x-bd-parent-span-id", butil::string_printf(
"%llu", (unsigned long long)span->parent_span_id()));
}
}