in src/brpc/controller.cpp [1044:1259]
void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.begin_time_us = start_realtime_us;
// If has retry/backup request,we will recalculate the timeout,
if (_real_timeout_ms > 0) {
_real_timeout_ms -= (start_realtime_us - _begin_time_us) / 1000;
}
// Clear last error, Don't clear _error_text because we append to it.
_error_code = 0;
// Make versioned correlation_id.
// call_id : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
// call_id + 1 : first try.
// call_id + 2 : retry 1
// ...
// call_id + N + 1 : retry N
// All ids except call_id are versioned. Say if we've sent retry 1 and
// a failed response of first try comes back, it will be ignored.
const CallId cid = current_id();
// Intercept IssueRPC when _sender is set. Currently _sender is only set
// by SelectiveChannel.
if (_sender) {
if (_sender->IssueRPC(start_realtime_us) != 0) {
return HandleSendFailed();
}
CHECK_EQ(0, bthread_id_unlock(cid));
return;
}
// Pick a target server for sending RPC
_current_call.need_feedback = false;
_current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock;
if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
}
_current_call.peer_id = _single_server_id;
} else {
LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true,
has_request_code(), _request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) {
std::ostringstream os;
DescribeOptions opt;
opt.verbose = false;
_lb->Describe(os, opt);
SetFailed(rc, "Fail to select server from %s", os.str().c_str());
return HandleSendFailed();
}
_current_call.need_feedback = sel_out.need_feedback;
_current_call.peer_id = tmp_sock->id();
// NOTE: _remote_side must be set here because _pack_request below
// may need it (e.g. http may set "Host" to _remote_side)
// Don't set _local_side here because tmp_sock may be not connected
// here.
_remote_side = tmp_sock->remote_side();
}
if (_stream_creator) {
_current_call.stream_user_data =
_stream_creator->OnCreatingStream(&tmp_sock, this);
if (FailedInline()) {
return HandleSendFailed();
}
// remote_side can't be changed.
CHECK_EQ(_remote_side, tmp_sock->remote_side());
}
Span* span = _span;
if (span) {
if (_current_call.nretry == 0) {
span->set_remote_side(_remote_side);
} else {
span->Annotate("Retrying %s",
endpoint2str(_remote_side).c_str());
}
}
// Handle connection type
if (_connection_type == CONNECTION_TYPE_SINGLE ||
_stream_creator != NULL) { // let user decides the sending_sock
// in the callback(according to connection_type) directly
_current_call.sending_sock.reset(tmp_sock.release());
// TODO(gejun): Setting preferred index of single-connected socket
// has two issues:
// 1. race conditions. If a set perferred_index is overwritten by
// another thread, the response back has to check protocols one
// by one. This is a performance issue, correctness is unaffected.
// 2. thrashing between different protocols. Also a performance issue.
_current_call.sending_sock->set_preferred_index(_preferred_index);
} else {
int rc = 0;
if (_connection_type == CONNECTION_TYPE_POOLED) {
rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock);
} else if (_connection_type == CONNECTION_TYPE_SHORT) {
rc = tmp_sock->GetShortSocket(&_current_call.sending_sock);
} else {
tmp_sock.reset();
SetFailed(EINVAL, "Invalid connection_type=%d", (int)_connection_type);
return HandleSendFailed();
}
if (rc) {
tmp_sock.reset();
SetFailed(rc, "Fail to get %s connection",
ConnectionTypeToString(_connection_type));
return HandleSendFailed();
}
// Remember the preferred protocol for non-single connection. When
// the response comes back, InputMessenger calls the right handler
// w/o trying other protocols. This is a must for (many) protocols that
// can't be distinguished from other protocols w/o ambiguity.
_current_call.sending_sock->set_preferred_index(_preferred_index);
// Set preferred_index of main_socket as well to make it easier to
// debug and observe from /connections.
if (tmp_sock->preferred_index() < 0) {
tmp_sock->set_preferred_index(_preferred_index);
}
tmp_sock.reset();
}
if (_tos > 0) {
_current_call.sending_sock->set_type_of_service(_tos);
}
if (is_response_read_progressively()) {
// Tag the socket so that when the response comes back, the parser will
// stop before reading all body.
_current_call.sending_sock->read_will_be_progressive(_connection_type);
}
// Handle authentication
const Authenticator* using_auth = NULL;
if (_auth != NULL) {
// Only one thread will be the winner and get the right to pack
// authentication information, others wait until the request
// is sent.
int auth_error = 0;
if (_current_call.sending_sock->FightAuthentication(&auth_error) == 0) {
using_auth = _auth;
} else if (auth_error != 0) {
SetFailed(auth_error, "Fail to authenticate, %s",
berror(auth_error));
return HandleSendFailed();
}
}
// Make request
butil::IOBuf packet;
SocketMessage* user_packet = NULL;
_pack_request(&packet, &user_packet, cid.value, _method, this,
_request_buf, using_auth);
// TODO: PackRequest may accept SocketMessagePtr<>?
SocketMessagePtr<> user_packet_guard(user_packet);
if (FailedInline()) {
// controller should already be SetFailed.
if (using_auth) {
// Don't forget to signal waiters on authentication
_current_call.sending_sock->SetAuthentication(ErrorCode());
}
return HandleSendFailed();
}
timespec connect_abstime;
timespec* pabstime = NULL;
if (_connect_timeout_ms > 0) {
if (_deadline_us >= 0) {
connect_abstime = butil::microseconds_to_timespec(
std::min(_connect_timeout_ms * 1000L + start_realtime_us,
_deadline_us));
} else {
connect_abstime = butil::microseconds_to_timespec(
_connect_timeout_ms * 1000L + start_realtime_us);
}
pabstime = &connect_abstime;
}
Socket::WriteOptions wopt;
wopt.id_wait = cid;
wopt.abstime = pabstime;
wopt.pipelined_count = _pipelined_count;
wopt.auth_flags = _auth_flags;
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
wopt.write_in_background = write_to_socket_in_background();
int rc;
size_t packet_size = 0;
if (user_packet_guard) {
if (span) {
packet_size = user_packet_guard->EstimatedByteSize();
}
rc = _current_call.sending_sock->Write(user_packet_guard, &wopt);
} else {
packet_size = packet.size();
rc = _current_call.sending_sock->Write(&packet, &wopt);
}
if (span) {
if (_current_call.nretry == 0) {
span->set_sent_us(butil::cpuwide_time_us());
span->set_request_size(packet_size);
} else {
span->Annotate("Requested(%lld) [%d]",
(long long)packet_size, _current_call.nretry + 1);
}
}
if (using_auth) {
// For performance concern, we set authentication to immediately
// after the first `Write' returns instead of waiting for server
// to confirm the credential data
_current_call.sending_sock->SetAuthentication(rc);
}
CHECK_EQ(0, bthread_id_unlock(cid));
}