in src/brpc/channel.cpp [412:595]
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
const int64_t start_send_real_us = butil::gettimeofday_us();
Controller* cntl = static_cast<Controller*>(controller_base);
cntl->OnRPCBegin(start_send_real_us);
// Override max_retry first to reset the range of correlation_id
if (cntl->max_retry() == UNSET_MAGIC_NUM) {
cntl->set_max_retry(_options.max_retry);
}
if (cntl->max_retry() < 0) {
// this is important because #max_retry decides #versions allocated
// in correlation_id. negative max_retry causes undefined behavior.
cntl->set_max_retry(0);
}
// HTTP needs this field to be set before any SetFailed()
cntl->_request_protocol = _options.protocol;
if (_options.protocol.has_param()) {
CHECK(cntl->protocol_param().empty());
cntl->protocol_param() = _options.protocol.param();
}
if (_options.protocol == brpc::PROTOCOL_HTTP && (_scheme == "https" || _scheme == "http")) {
URI& uri = cntl->http_request().uri();
if (uri.host().empty() && !_service_name.empty()) {
uri.SetHostAndPort(_service_name);
}
}
cntl->_preferred_index = _preferred_index;
cntl->_retry_policy = _options.retry_policy;
if (_options.enable_circuit_breaker) {
cntl->add_flag(Controller::FLAGS_ENABLED_CIRCUIT_BREAKER);
}
const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range(
correlation_id, NULL, 2 + cntl->max_retry());
if (rc != 0) {
CHECK_EQ(EINVAL, rc);
if (!cntl->FailedInline()) {
cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64,
correlation_id.value);
}
LOG_IF(ERROR, cntl->is_used_by_rpc())
<< "Controller=" << cntl << " was used by another RPC before. "
"Did you forget to Reset() it before reuse?";
// Have to run done in-place. If the done runs in another thread,
// Join() on this RPC is no-op and probably ends earlier than running
// the callback and releases resources used in the callback.
// Since this branch is only entered by wrongly-used RPC, the
// potentially introduced deadlock(caused by locking RPC and done with
// the same non-recursive lock) is acceptable and removable by fixing
// user's code.
if (done) {
done->Run();
}
return;
}
cntl->set_used_by_rpc();
if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
const int64_t start_send_us = butil::cpuwide_time_us();
const std::string* method_name = NULL;
if (_get_method_name) {
method_name = &_get_method_name(method, cntl);
} else if (method) {
method_name = &method->full_name();
} else {
const static std::string NULL_METHOD_STR = "null-method";
method_name = &NULL_METHOD_STR;
}
Span* span = Span::CreateClientSpan(
*method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
cntl->_span = span;
}
// Override some options if they haven't been set by Controller
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
cntl->set_timeout_ms(_options.timeout_ms);
}
// Since connection is shared extensively amongst channels and RPC,
// overriding connect_timeout_ms does not make sense, just use the
// one in ChannelOptions
cntl->_connect_timeout_ms = _options.connect_timeout_ms;
if (cntl->backup_request_ms() == UNSET_MAGIC_NUM &&
NULL == cntl->_backup_request_policy) {
cntl->set_backup_request_ms(_options.backup_request_ms);
cntl->_backup_request_policy = _options.backup_request_policy;
}
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
cntl->set_connection_type(_options.connection_type);
}
cntl->_response = response;
cntl->_done = done;
cntl->_pack_request = _pack_request;
cntl->_method = method;
cntl->_auth = _options.auth;
if (SingleServer()) {
cntl->_single_server_id = _server_id;
cntl->_remote_side = _server_address;
}
// Share the lb with controller.
cntl->_lb = _lb;
// Ensure that serialize_request is done before pack_request in all
// possible executions, including:
// HandleSendFailed => OnVersionedRPCReturned => IssueRPC(pack_request)
_serialize_request(&cntl->_request_buf, cntl, request);
if (cntl->FailedInline()) {
// Handle failures caused by serialize_request, and these error_codes
// should be excluded from the retry_policy.
return cntl->HandleSendFailed();
}
if (FLAGS_usercode_in_pthread &&
done != NULL &&
TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when "
"-usercode_in_pthread is on");
return cntl->HandleSendFailed();
}
if (!cntl->_request_streams.empty()) {
// Currently we cannot handle retry and backup request correctly
cntl->set_max_retry(0);
cntl->set_backup_request_ms(-1);
cntl->_backup_request_policy = NULL;
}
if (cntl->backup_request_ms() >= 0 &&
(cntl->backup_request_ms() < cntl->timeout_ms() ||
cntl->timeout_ms() < 0)) {
// Setup timer for backup request. When it occurs, we'll setup a
// timer of timeout_ms before sending backup request.
// _deadline_us is for truncating _connect_timeout_ms and resetting
// timer when EBACKUPREQUEST occurs.
if (cntl->timeout_ms() < 0) {
cntl->_deadline_us = -1;
} else {
cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
}
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(
cntl->backup_request_ms() * 1000L + start_send_real_us),
HandleBackupRequest, (void*)correlation_id.value);
if (BAIDU_UNLIKELY(rc != 0)) {
cntl->SetFailed(rc, "Fail to add timer for backup request");
return cntl->HandleSendFailed();
}
} else if (cntl->timeout_ms() >= 0) {
// Setup timer for RPC timetout
// _deadline_us is for truncating _connect_timeout_ms
cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_deadline_us),
HandleTimeout, (void*)correlation_id.value);
if (BAIDU_UNLIKELY(rc != 0)) {
cntl->SetFailed(rc, "Fail to add timer for timeout");
return cntl->HandleSendFailed();
}
} else {
cntl->_deadline_us = -1;
}
cntl->IssueRPC(start_send_real_us);
if (done == NULL) {
// MUST wait for response when sending synchronous RPC. It will
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id);
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}