in src/brpc/policy/baidu_rpc_protocol.cpp [263:449]
void SendRpcResponse(int64_t correlation_id, Controller* cntl,
RpcPBMessages* messages, const Server* server,
MethodStatus* method_status, int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
// Recycle resources at the end of this function.
BRPC_SCOPE_EXIT {
{
// Remove concurrency and record latency at first.
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
}
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
if (NULL == messages) {
return;
}
cntl->CallAfterRpcResp(req, res);
if (NULL == server->options().baidu_master_service) {
server->options().rpc_pb_message_factory->Return(messages);
} else {
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
}
};
StreamIds response_stream_ids = accessor.response_streams();
if (cntl->IsCloseConnection()) {
for(size_t i = 0; i < response_stream_ids.size(); ++i) {
StreamClose(response_stream_ids[i]);
}
sock->SetFailed();
return;
}
bool append_body = false;
butil::IOBuf res_body;
// `res' can be NULL here, in which case we don't serialize it
// If user calls `SetFailed' on Controller, we don't serialize
// response either
CompressType compress_type = cntl->response_compress_type();
if (res != NULL && !cntl->Failed()) {
append_body = SerializeResponse(*res, *cntl, res_body);
}
// Don't use res->ByteSize() since it may be compressed
size_t res_size = 0;
size_t attached_size = 0;
if (append_body) {
res_size = res_body.length();
attached_size = cntl->response_attachment().length();
}
int error_code = cntl->ErrorCode();
if (error_code == -1) {
// replace general error (-1) with INTERNAL_SERVER_ERROR to make a
// distinction between server error and client error
error_code = EINTERNAL;
}
RpcMeta meta;
RpcResponseMeta* response_meta = meta.mutable_response();
response_meta->set_error_code(error_code);
if (!cntl->ErrorText().empty()) {
// Only set error_text when it's not empty since protobuf Message
// always new the string no matter if it's empty or not.
response_meta->set_error_text(cntl->ErrorText());
}
meta.set_correlation_id(correlation_id);
meta.set_compress_type(compress_type);
meta.set_content_type(cntl->response_content_type());
if (attached_size > 0) {
meta.set_attachment_size(attached_size);
}
StreamId response_stream_id = INVALID_STREAM_ID;
SocketUniquePtr stream_ptr;
if (!response_stream_ids.empty()) {
response_stream_id = response_stream_ids[0];
if (Socket::Address(response_stream_id, &stream_ptr) == 0) {
Stream* s = (Stream *) stream_ptr->conn();
StreamSettings *stream_settings = meta.mutable_stream_settings();
s->FillSettings(stream_settings);
s->SetHostSocket(sock);
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
}
} else {
LOG(WARNING) << "Stream=" << response_stream_id
<< " was closed before sending response";
}
}
if (cntl->has_response_user_fields() &&
!cntl->response_user_fields()->empty()) {
::google::protobuf::Map<std::string, std::string>& user_fields
= *meta.mutable_user_fields();
user_fields.insert(cntl->response_user_fields()->begin(),
cntl->response_user_fields()->end());
}
butil::IOBuf res_buf;
SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
if (append_body) {
res_buf.append(res_body.movable());
if (attached_size > 0) {
res_buf.append(cntl->response_attachment().movable());
}
}
ResponseWriteInfo args;
bthread_id_t response_id = INVALID_BTHREAD_ID;
if (span) {
span->set_response_size(res_buf.size());
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}
// Send rpc response over stream even if server side failed to create
// stream for some reason.
if (cntl->has_remote_stream()) {
// Send the response over stream to notify that this stream connection
// is successfully built.
// Response_stream can be INVALID_STREAM_ID when error occurs.
if (SendStreamData(sock, &res_buf,
accessor.remote_stream_settings()->stream_id(),
response_stream_id, response_id) != 0) {
error_code = errno;
PLOG_IF(WARNING, error_code != EPIPE)
<< "Fail to write into " << sock->description();
cntl->SetFailed(error_code, "Fail to write into %s",
sock->description().c_str());
Stream::SetFailed(response_stream_ids, error_code,
"Fail to write into %s",
sock->description().c_str());
return;
}
// Now it's ok the mark these server-side streams as connected as all the
// written user data would follower the RPC response.
// Reuse stream_ptr to avoid address first stream id again
if (stream_ptr) {
((Stream*)stream_ptr->conn())->SetConnected();
}
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
StreamId extra_stream_id = response_stream_ids[i];
SocketUniquePtr extra_stream_ptr;
if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
extra_stream->SetHostSocket(sock);
extra_stream->SetConnected();
} else {
LOG(WARNING) << "Stream=" << extra_stream_id
<< " was closed before sending response";
}
}
} else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (INVALID_BTHREAD_ID != response_id) {
wopt.id_wait = response_id;
wopt.notify_on_success = true;
}
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}
if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
}