in src/brpc/policy/thrift_protocol.cpp [239:366]
void ThriftClosure::DoRun() {
// Recycle itself after `Run'
std::unique_ptr<ThriftClosure> recycle_ctx(this);
const Server* server = _controller.server();
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
MethodStatus* method_status = (server->options().thrift_service ?
server->options().thrift_service->_status : NULL);
ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
const int error_code = _controller.ErrorCode();
if (error_code == ENOSERVICE ||
error_code == ENOMETHOD ||
error_code == EREQUEST ||
error_code == ECLOSE ||
error_code == ELOGOFF ||
error_code == ELIMIT) {
ServerPrivateAccessor(server).AddError();
}
}
if (_controller.IsCloseConnection() ||
// seq_id is not read yet, no valid response can be sent back
!_controller.has_log_id()) {
sock->SetFailed();
return;
}
const std::string& method_name = _controller.thrift_method_name();
if (method_name.empty() || method_name[0] == ' ') {
_controller.SetFailed(ENOMETHOD, "Invalid thrift_method_name!");
}
if (method_name.size() > MAX_THRIFT_METHOD_NAME_LENGTH) {
_controller.SetFailed(ENOMETHOD, "thrift_method_name is too long");
}
if (_controller.log_id() > (uint64_t)0xffffffff) {
_controller.SetFailed(ERESPONSE, "Invalid thrift seq_id=%" PRIu64,
_controller.log_id());
}
const uint32_t seq_id = (uint32_t)_controller.log_id();
butil::IOBuf write_buf;
// The following code was taken and modified from thrift auto generated code
if (_controller.Failed()) {
auto out_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
::apache::thrift::TApplicationException x(_controller.ErrorText());
oprot.writeMessageBegin(
method_name, ::apache::thrift::protocol::T_EXCEPTION, seq_id);
x.write(&oprot);
oprot.writeMessageEnd();
oprot.getTransport()->writeEnd();
oprot.getTransport()->flush();
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
const thrift_head_t head = { htonl(sz) };
write_buf.append(&head, sizeof(head));
write_buf.append(buf, sz);
} else if (_response.raw_instance()) {
auto out_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
oprot.writeMessageBegin(
method_name, ::apache::thrift::protocol::T_REPLY, seq_id);
uint32_t xfer = 0;
xfer += oprot.writeStructBegin("rpc_result"); // can be any valid name
xfer += oprot.writeFieldBegin("success",
::apache::thrift::protocol::T_STRUCT,
THRIFT_RESPONSE_FID);
xfer += _response.raw_instance()->Write(&oprot);
xfer += oprot.writeFieldEnd();
xfer += oprot.writeFieldStop();
xfer += oprot.writeStructEnd();
oprot.writeMessageEnd();
oprot.getTransport()->writeEnd();
oprot.getTransport()->flush();
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
const thrift_head_t head = { htonl(sz) };
write_buf.append(&head, sizeof(head));
write_buf.append(buf, sz);
} else {
const size_t mb_size = ThriftMessageBeginSize(method_name);
char buf[sizeof(thrift_head_t) + mb_size];
// suppress strict-aliasing warning
thrift_head_t* head = (thrift_head_t*)buf;
head->body_len = htonl(mb_size + _response.body.size());
WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name,
::apache::thrift::protocol::T_REPLY, seq_id);
write_buf.append(buf, sizeof(buf));
write_buf.append(_response.body.movable());
}
if (span) {
span->set_response_size(write_buf.size());
}
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
}