void SendRpcResponse()

in src/brpc/policy/baidu_rpc_protocol.cpp [263:449]


void SendRpcResponse(int64_t correlation_id, Controller* cntl,
                     RpcPBMessages* messages, const Server* server,
                     MethodStatus* method_status, int64_t received_us) {
    ControllerPrivateAccessor accessor(cntl);
    Span* span = accessor.span();
    if (span) {
        span->set_start_send_us(butil::cpuwide_time_us());
    }
    Socket* sock = accessor.get_sending_socket();

    const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
    const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();

    // Recycle resources at the end of this function.
    BRPC_SCOPE_EXIT {
        {
            // Remove concurrency and record latency at first.
            ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
        }

        std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);

        if (NULL == messages) {
            return;
        }

        cntl->CallAfterRpcResp(req, res);
        if (NULL == server->options().baidu_master_service) {
            server->options().rpc_pb_message_factory->Return(messages);
        } else {
            BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
        }
    };
    
    StreamIds response_stream_ids = accessor.response_streams();

    if (cntl->IsCloseConnection()) {
        for(size_t i = 0; i < response_stream_ids.size(); ++i) {
            StreamClose(response_stream_ids[i]);
        }
        sock->SetFailed();
        return;
    }
    bool append_body = false;
    butil::IOBuf res_body;
    // `res' can be NULL here, in which case we don't serialize it
    // If user calls `SetFailed' on Controller, we don't serialize
    // response either
    CompressType compress_type = cntl->response_compress_type();
    if (res != NULL && !cntl->Failed()) {
        append_body = SerializeResponse(*res, *cntl, res_body);
    }

    // Don't use res->ByteSize() since it may be compressed
    size_t res_size = 0;
    size_t attached_size = 0;
    if (append_body) {
        res_size = res_body.length();
        attached_size = cntl->response_attachment().length();
    }

    int error_code = cntl->ErrorCode();
    if (error_code == -1) {
        // replace general error (-1) with INTERNAL_SERVER_ERROR to make a
        // distinction between server error and client error
        error_code = EINTERNAL;
    }
    RpcMeta meta;
    RpcResponseMeta* response_meta = meta.mutable_response();
    response_meta->set_error_code(error_code);
    if (!cntl->ErrorText().empty()) {
        // Only set error_text when it's not empty since protobuf Message
        // always new the string no matter if it's empty or not.
        response_meta->set_error_text(cntl->ErrorText());
    }
    meta.set_correlation_id(correlation_id);
    meta.set_compress_type(compress_type);
    meta.set_content_type(cntl->response_content_type());
    if (attached_size > 0) {
        meta.set_attachment_size(attached_size);
    }
    StreamId response_stream_id = INVALID_STREAM_ID;
    SocketUniquePtr stream_ptr;
    if (!response_stream_ids.empty()) {
        response_stream_id = response_stream_ids[0];
        if (Socket::Address(response_stream_id, &stream_ptr) == 0) {
            Stream* s = (Stream *) stream_ptr->conn();
            StreamSettings *stream_settings = meta.mutable_stream_settings();
            s->FillSettings(stream_settings);
            s->SetHostSocket(sock);
            for (size_t i = 1; i < response_stream_ids.size(); ++i) {
                stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
            }
        } else {
            LOG(WARNING) << "Stream=" << response_stream_id 
                         << " was closed before sending response";
        }
    }

    if (cntl->has_response_user_fields() &&
        !cntl->response_user_fields()->empty()) {
        ::google::protobuf::Map<std::string, std::string>& user_fields
            = *meta.mutable_user_fields();
        user_fields.insert(cntl->response_user_fields()->begin(),
                           cntl->response_user_fields()->end());

    }

    butil::IOBuf res_buf;
    SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
    if (append_body) {
        res_buf.append(res_body.movable());
        if (attached_size > 0) {
            res_buf.append(cntl->response_attachment().movable());
        }
    }

    ResponseWriteInfo args;
    bthread_id_t response_id = INVALID_BTHREAD_ID;
    if (span) {
        span->set_response_size(res_buf.size());
        CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
    }

    // Send rpc response over stream even if server side failed to create
    // stream for some reason.
    if (cntl->has_remote_stream()) {
        // Send the response over stream to notify that this stream connection
        // is successfully built.
        // Response_stream can be INVALID_STREAM_ID when error occurs.
        if (SendStreamData(sock, &res_buf,
                           accessor.remote_stream_settings()->stream_id(),
                           response_stream_id, response_id) != 0) {
            error_code = errno;
            PLOG_IF(WARNING, error_code != EPIPE)
                << "Fail to write into " << sock->description();
            cntl->SetFailed(error_code,  "Fail to write into %s",
                            sock->description().c_str());
            Stream::SetFailed(response_stream_ids, error_code,
                              "Fail to write into %s",
                              sock->description().c_str());
            return;
        }

        // Now it's ok the mark these server-side streams as connected as all the
        // written user data would follower the RPC response.
        // Reuse stream_ptr to avoid address first stream id again
        if (stream_ptr) {
            ((Stream*)stream_ptr->conn())->SetConnected();
        }
        for (size_t i = 1; i < response_stream_ids.size(); ++i) {
            StreamId extra_stream_id = response_stream_ids[i];
            SocketUniquePtr extra_stream_ptr;
            if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
                Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
                extra_stream->SetHostSocket(sock);
                extra_stream->SetConnected();
            } else {
                LOG(WARNING) << "Stream=" << extra_stream_id
                             << " was closed before sending response";
            }
        }
    } else{
        // Have the risk of unlimited pending responses, in which case, tell
        // users to set max_concurrency.
        Socket::WriteOptions wopt;
        wopt.ignore_eovercrowded = true;
        if (INVALID_BTHREAD_ID != response_id) {
            wopt.id_wait = response_id;
            wopt.notify_on_success = true;
        }
        if (sock->Write(&res_buf, &wopt) != 0) {
            const int errcode = errno;
            PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
            cntl->SetFailed(errcode, "Fail to write into %s",
                            sock->description().c_str());
            return;
        }
    }

    if (span) {
        bthread_id_join(response_id);
        // Do not care about the result of background writing.
        // TODO: this is not sent
        span->set_sent_us(args.sent_us);
    }
}