void Controller::IssueRPC()

in src/brpc/controller.cpp [1044:1259]


void Controller::IssueRPC(int64_t start_realtime_us) {
    _current_call.begin_time_us = start_realtime_us;
    
    // If has retry/backup request,we will recalculate the timeout,
    if (_real_timeout_ms > 0) {
        _real_timeout_ms -= (start_realtime_us - _begin_time_us) / 1000;
    }

    // Clear last error, Don't clear _error_text because we append to it.
    _error_code = 0;

    // Make versioned correlation_id.
    // call_id         : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
    // call_id + 1     : first try.
    // call_id + 2     : retry 1
    // ...
    // call_id + N + 1 : retry N
    // All ids except call_id are versioned. Say if we've sent retry 1 and
    // a failed response of first try comes back, it will be ignored.
    const CallId cid = current_id();

    // Intercept IssueRPC when _sender is set. Currently _sender is only set
    // by SelectiveChannel.
    if (_sender) {
        if (_sender->IssueRPC(start_realtime_us) != 0) {
            return HandleSendFailed();
        }
        CHECK_EQ(0, bthread_id_unlock(cid));
        return;
    }

    // Pick a target server for sending RPC
    _current_call.need_feedback = false;
    _current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
    SocketUniquePtr tmp_sock;
    if (SingleServer()) {
        // Don't use _current_call.peer_id which is set to -1 after construction
        // of the backup call.
        const int rc = Socket::Address(_single_server_id, &tmp_sock);
        if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
            SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
                      endpoint2str(_remote_side).c_str(), _single_server_id);
            tmp_sock.reset();  // Release ref ASAP
            return HandleSendFailed();
        }
        _current_call.peer_id = _single_server_id;
    } else {
        LoadBalancer::SelectIn sel_in =
            { start_realtime_us, true,
              has_request_code(), _request_code, _accessed };
        LoadBalancer::SelectOut sel_out(&tmp_sock);
        const int rc = _lb->SelectServer(sel_in, &sel_out);
        if (rc != 0) {
            std::ostringstream os;
            DescribeOptions opt;
            opt.verbose = false;
            _lb->Describe(os, opt);
            SetFailed(rc, "Fail to select server from %s", os.str().c_str());
            return HandleSendFailed();
        }
        _current_call.need_feedback = sel_out.need_feedback;
        _current_call.peer_id = tmp_sock->id();
        // NOTE: _remote_side must be set here because _pack_request below
        // may need it (e.g. http may set "Host" to _remote_side)
        // Don't set _local_side here because tmp_sock may be not connected
        // here.
        _remote_side = tmp_sock->remote_side();
    }
    if (_stream_creator) {
        _current_call.stream_user_data =
            _stream_creator->OnCreatingStream(&tmp_sock, this);
        if (FailedInline()) {
            return HandleSendFailed();
        }
        // remote_side can't be changed.
        CHECK_EQ(_remote_side, tmp_sock->remote_side());
    }

    Span* span = _span;
    if (span) {
        if (_current_call.nretry == 0) {
            span->set_remote_side(_remote_side);
        } else {
            span->Annotate("Retrying %s",
                           endpoint2str(_remote_side).c_str());
        }
    }
    // Handle connection type
    if (_connection_type == CONNECTION_TYPE_SINGLE ||
        _stream_creator != NULL) { // let user decides the sending_sock
        // in the callback(according to connection_type) directly
        _current_call.sending_sock.reset(tmp_sock.release());
        // TODO(gejun): Setting preferred index of single-connected socket
        // has two issues:
        //   1. race conditions. If a set perferred_index is overwritten by
        //      another thread, the response back has to check protocols one
        //      by one. This is a performance issue, correctness is unaffected.
        //   2. thrashing between different protocols. Also a performance issue.
        _current_call.sending_sock->set_preferred_index(_preferred_index);
    } else {
        int rc = 0;
        if (_connection_type == CONNECTION_TYPE_POOLED) {
            rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock);
        } else if (_connection_type == CONNECTION_TYPE_SHORT) {
            rc = tmp_sock->GetShortSocket(&_current_call.sending_sock);
        } else {
            tmp_sock.reset();
            SetFailed(EINVAL, "Invalid connection_type=%d", (int)_connection_type);
            return HandleSendFailed();
        }
        if (rc) {
            tmp_sock.reset();
            SetFailed(rc, "Fail to get %s connection",
                      ConnectionTypeToString(_connection_type));
            return HandleSendFailed();
        }
        // Remember the preferred protocol for non-single connection. When
        // the response comes back, InputMessenger calls the right handler
        // w/o trying other protocols. This is a must for (many) protocols that
        // can't be distinguished from other protocols w/o ambiguity.
        _current_call.sending_sock->set_preferred_index(_preferred_index);
        // Set preferred_index of main_socket as well to make it easier to
        // debug and observe from /connections.
        if (tmp_sock->preferred_index() < 0) {
            tmp_sock->set_preferred_index(_preferred_index);
        }
        tmp_sock.reset();
    }
    if (_tos > 0) {
        _current_call.sending_sock->set_type_of_service(_tos);
    }
    if (is_response_read_progressively()) {
        // Tag the socket so that when the response comes back, the parser will
        // stop before reading all body.
        _current_call.sending_sock->read_will_be_progressive(_connection_type);
    }

    // Handle authentication
    const Authenticator* using_auth = NULL;
    if (_auth != NULL) {
        // Only one thread will be the winner and get the right to pack
        // authentication information, others wait until the request
        // is sent.
        int auth_error = 0;
        if (_current_call.sending_sock->FightAuthentication(&auth_error) == 0) {
            using_auth = _auth;
        } else if (auth_error != 0) {
            SetFailed(auth_error, "Fail to authenticate, %s",
                      berror(auth_error));
            return HandleSendFailed();
        }
    }
    // Make request
    butil::IOBuf packet;
    SocketMessage* user_packet = NULL;
    _pack_request(&packet, &user_packet, cid.value, _method, this,
                  _request_buf, using_auth);
    // TODO: PackRequest may accept SocketMessagePtr<>?
    SocketMessagePtr<> user_packet_guard(user_packet);
    if (FailedInline()) {
        // controller should already be SetFailed.
        if (using_auth) {
            // Don't forget to signal waiters on authentication
            _current_call.sending_sock->SetAuthentication(ErrorCode());
        }
        return HandleSendFailed();
    }

    timespec connect_abstime;
    timespec* pabstime = NULL;
    if (_connect_timeout_ms > 0) {
        if (_deadline_us >= 0) {
            connect_abstime = butil::microseconds_to_timespec(
                std::min(_connect_timeout_ms * 1000L + start_realtime_us,
                         _deadline_us));
        } else {
            connect_abstime = butil::microseconds_to_timespec(
                _connect_timeout_ms * 1000L + start_realtime_us);
        }
        pabstime = &connect_abstime;
    }
    Socket::WriteOptions wopt;
    wopt.id_wait = cid;
    wopt.abstime = pabstime;
    wopt.pipelined_count = _pipelined_count;
    wopt.auth_flags = _auth_flags;
    wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
    wopt.write_in_background = write_to_socket_in_background();
    int rc;
    size_t packet_size = 0;
    if (user_packet_guard) {
        if (span) {
            packet_size = user_packet_guard->EstimatedByteSize();
        }
        rc = _current_call.sending_sock->Write(user_packet_guard, &wopt);
    } else {
        packet_size = packet.size();
        rc = _current_call.sending_sock->Write(&packet, &wopt);
    }
    if (span) {
        if (_current_call.nretry == 0) {
            span->set_sent_us(butil::cpuwide_time_us());
            span->set_request_size(packet_size);
        } else {
            span->Annotate("Requested(%lld) [%d]",
                           (long long)packet_size, _current_call.nretry + 1);
        }
    }
    if (using_auth) {
        // For performance concern, we set authentication to immediately
        // after the first `Write' returns instead of waiting for server
        // to confirm the credential data
        _current_call.sending_sock->SetAuthentication(rc);
    }
    CHECK_EQ(0, bthread_id_unlock(cid));
}