in src/brpc/controller.cpp [595:755]
void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
bool new_bthread, int saved_error) {
// TODO(gejun): Simplify call-ending code.
// Intercept previous calls
while (info.id != _correlation_id && info.id != current_id()) {
if (_unfinished_call && get_id(_unfinished_call->nretry) == info.id) {
if (!FailedInline()) {
// Continue with successful backup request.
break;
}
// Complete failed backup request.
_unfinished_call->OnComplete(this, _error_code, info.responded, false);
delete _unfinished_call;
_unfinished_call = NULL;
}
// Ignore all non-backup requests and failed backup requests.
_error_code = saved_error;
response_attachment().clear();
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}
if ((!_error_code && _retry_policy == NULL) ||
_current_call.nretry >= _max_retry) {
goto END_OF_RPC;
}
if (_error_code == EBACKUPREQUEST) {
if (NULL != _backup_request_policy && !_backup_request_policy->DoBackup(this)) {
// No need to do backup request.
_error_code = saved_error;
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}
// Reset timeout if needed
int rc = 0;
if (timeout_ms() >= 0) {
rc = bthread_timer_add(
&_timeout_id,
butil::microseconds_to_timespec(_deadline_us),
HandleTimeout, (void*)_correlation_id.value);
}
if (rc != 0) {
SetFailed(rc, "Fail to add timer");
goto END_OF_RPC;
}
if (!SingleServer()) {
if (_accessed == NULL) {
_accessed = ExcludedServers::Create(
std::min(_max_retry, RETRY_AVOIDANCE));
if (NULL == _accessed) {
SetFailed(ENOMEM, "Fail to create ExcludedServers");
goto END_OF_RPC;
}
}
_accessed->Add(_current_call.peer_id);
}
// _current_call does not end yet.
CHECK(_unfinished_call == NULL); // only one backup request now.
_unfinished_call = new (std::nothrow) Call(&_current_call);
if (_unfinished_call == NULL) {
SetFailed(ENOMEM, "Fail to new Call");
goto END_OF_RPC;
}
++_current_call.nretry;
add_flag(FLAGS_BACKUP_REQUEST);
return IssueRPC(butil::gettimeofday_us());
} else {
auto retry_policy = _retry_policy ? _retry_policy : DefaultRetryPolicy();
if (retry_policy->DoRetry(this)) {
// The error must come from _current_call because:
// * we intercepted error from _unfinished_call in OnVersionedRPCReturned
// * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
if (!SingleServer()) {
if (_accessed == NULL) {
_accessed = ExcludedServers::Create(
std::min(_max_retry, RETRY_AVOIDANCE));
if (NULL == _accessed) {
SetFailed(ENOMEM, "Fail to create ExcludedServers");
goto END_OF_RPC;
}
}
_accessed->Add(_current_call.peer_id);
}
_current_call.OnComplete(this, _error_code, info.responded, false);
++_current_call.nretry;
// Clear http responses before retrying, otherwise the response may
// be mixed with older (and undefined) stuff. This is actually not
// done before r32008.
if (_http_response) {
_http_response->Clear();
}
response_attachment().clear();
// Retry backoff.
bthread::TaskGroup* g = bthread::tls_task_group;
int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L;
if (backoff_time_us > 0 &&
backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
// No need to do retry backoff when the backoff time is longer than the remaining rpc time.
if (retry_policy->CanRetryBackoffInPthread() ||
(g && !g->is_current_pthread_task())) {
bthread_usleep(backoff_time_us);
} else {
LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
"skip retry backoff in pthread.";
}
}
return IssueRPC(butil::gettimeofday_us());
}
}
END_OF_RPC:
if (new_bthread && !FLAGS_usercode_in_coroutine) {
// [ Essential for -usercode_in_pthread=true ]
// When -usercode_in_pthread is on, the reserved threads (set by
// -usercode_backup_threads) may all block on bthread_id_lock in
// ProcessXXXResponse(), until the id is unlocked or destroyed which
// is run in a new thread when new_bthread is true. However since all
// workers are blocked, the created bthread will never be scheduled
// and result in deadlock.
// Make the id unlockable before creating the bthread fixes the issue.
// When -usercode_in_pthread is false, this also removes some useless
// waiting of the bthreads processing responses.
// Note[_done]: callid is destroyed after _done which possibly takes
// a lot of time, stop useless locking
// Note[cid]: When the callid needs to be destroyed in done->Run(),
// it does not mean that it will be destroyed directly in done->Run(),
// conversely the callid may still be locked/unlocked for many times
// before destroying. E.g. in slective channel, the callid is referenced
// by multiple sub-done and only destroyed by the last one. Calling
// bthread_id_about_to_destroy right here which makes the id unlockable
// anymore, is wrong. On the other hand, the combo channles setting
// FLAGS_DESTROY_CID_IN_DONE to true must be aware of
// -usercode_in_pthread and avoid deadlock by their own (TBR)
if ((FLAGS_usercode_in_pthread || _done != NULL/*Note[_done]*/) &&
!has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
bthread_id_about_to_destroy(info.id);
}
// No need to join this bthread since RPC caller won't wake up
// (or user's done won't be called) until this bthread finishes
bthread_t bt;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
_tmp_completion_info = info;
if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
EndRPC(info);
}
} else {
if (_done != NULL/*Note[_done]*/ &&
!has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
bthread_id_about_to_destroy(info.id);
}
EndRPC(info);
}
}