void ThriftClosure::DoRun()

in src/brpc/policy/thrift_protocol.cpp [239:366]


void ThriftClosure::DoRun() {
    // Recycle itself after `Run'
    std::unique_ptr<ThriftClosure> recycle_ctx(this);
    const Server* server = _controller.server();

    ControllerPrivateAccessor accessor(&_controller);
    Span* span = accessor.span();
    if (span) {
        span->set_start_send_us(butil::cpuwide_time_us());
    }
    Socket* sock = accessor.get_sending_socket();
    MethodStatus* method_status = (server->options().thrift_service ? 
        server->options().thrift_service->_status : NULL);
    ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
    if (!method_status) {
        // Judge errors belongings.
        // may not be accurate, but it does not matter too much.
        const int error_code = _controller.ErrorCode();
        if (error_code == ENOSERVICE ||
            error_code == ENOMETHOD ||
            error_code == EREQUEST ||
            error_code == ECLOSE ||
            error_code == ELOGOFF ||
            error_code == ELIMIT) {
            ServerPrivateAccessor(server).AddError();
        }
    }

    if (_controller.IsCloseConnection() ||
        // seq_id is not read yet, no valid response can be sent back
        !_controller.has_log_id()) {
        sock->SetFailed();
        return;
    }

    const std::string& method_name = _controller.thrift_method_name();
    if (method_name.empty() || method_name[0] == ' ') {
        _controller.SetFailed(ENOMETHOD, "Invalid thrift_method_name!");
    }
    if (method_name.size() > MAX_THRIFT_METHOD_NAME_LENGTH) {
        _controller.SetFailed(ENOMETHOD, "thrift_method_name is too long");
    }
    if (_controller.log_id() > (uint64_t)0xffffffff) {
        _controller.SetFailed(ERESPONSE, "Invalid thrift seq_id=%" PRIu64,
                              _controller.log_id());
    }
    const uint32_t seq_id = (uint32_t)_controller.log_id();

    butil::IOBuf write_buf;

    // The following code was taken and modified from thrift auto generated code
    if (_controller.Failed()) {
        auto out_buffer =
            THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
        apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
        ::apache::thrift::TApplicationException x(_controller.ErrorText());
        oprot.writeMessageBegin(
            method_name, ::apache::thrift::protocol::T_EXCEPTION, seq_id);
        x.write(&oprot);
        oprot.writeMessageEnd();
        oprot.getTransport()->writeEnd();
        oprot.getTransport()->flush();

        uint8_t* buf;
        uint32_t sz;
        out_buffer->getBuffer(&buf, &sz);
        const thrift_head_t head = { htonl(sz) };
        write_buf.append(&head, sizeof(head));
        write_buf.append(buf, sz);
    } else if (_response.raw_instance()) {
        auto out_buffer =
            THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
        apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
        oprot.writeMessageBegin(
            method_name, ::apache::thrift::protocol::T_REPLY, seq_id);

        uint32_t xfer = 0;
        xfer += oprot.writeStructBegin("rpc_result"); // can be any valid name
        xfer += oprot.writeFieldBegin("success",
                                      ::apache::thrift::protocol::T_STRUCT,
                                      THRIFT_RESPONSE_FID);
        xfer += _response.raw_instance()->Write(&oprot);
        xfer += oprot.writeFieldEnd();
        xfer += oprot.writeFieldStop();
        xfer += oprot.writeStructEnd();

        oprot.writeMessageEnd();
        oprot.getTransport()->writeEnd();
        oprot.getTransport()->flush();

        uint8_t* buf;
        uint32_t sz;
        out_buffer->getBuffer(&buf, &sz);
        const thrift_head_t head = { htonl(sz) };
        write_buf.append(&head, sizeof(head));
        write_buf.append(buf, sz);
    } else {
        const size_t mb_size = ThriftMessageBeginSize(method_name);
        char buf[sizeof(thrift_head_t) + mb_size];
        // suppress strict-aliasing warning
        thrift_head_t* head = (thrift_head_t*)buf;
        head->body_len = htonl(mb_size + _response.body.size());
        WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name,
                                ::apache::thrift::protocol::T_REPLY, seq_id);
        write_buf.append(buf, sizeof(buf));
        write_buf.append(_response.body.movable());
    }
    
    if (span) {
        span->set_response_size(write_buf.size());
    }
    // Have the risk of unlimited pending responses, in which case, tell
    // users to set max_concurrency.
    Socket::WriteOptions wopt;
    wopt.ignore_eovercrowded = true;
    if (sock->Write(&write_buf, &wopt) != 0) {
        const int errcode = errno;
        PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
        _controller.SetFailed(errcode, "Fail to write into %s",
                              sock->description().c_str());
        return;
    }

    if (span) {
        // TODO: this is not sent
        span->set_sent_us(butil::cpuwide_time_us());
    }
}