void Controller::OnVersionedRPCReturned()

in src/brpc/controller.cpp [595:755]


void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
                                        bool new_bthread, int saved_error) {
    // TODO(gejun): Simplify call-ending code.
    // Intercept previous calls
    while (info.id != _correlation_id && info.id != current_id()) {
        if (_unfinished_call && get_id(_unfinished_call->nretry) == info.id) {
            if (!FailedInline()) {
                // Continue with successful backup request.
                break;
            }
            // Complete failed backup request.
            _unfinished_call->OnComplete(this, _error_code, info.responded, false);
            delete _unfinished_call;
            _unfinished_call = NULL;
        }
        // Ignore all non-backup requests and failed backup requests.
        _error_code = saved_error;
        response_attachment().clear();
        CHECK_EQ(0, bthread_id_unlock(info.id));
        return;
    }

    if ((!_error_code && _retry_policy == NULL) ||
        _current_call.nretry >= _max_retry) {
        goto END_OF_RPC;
    }
    if (_error_code == EBACKUPREQUEST) {
        if (NULL != _backup_request_policy && !_backup_request_policy->DoBackup(this)) {
            // No need to do backup request.
            _error_code = saved_error;
            CHECK_EQ(0, bthread_id_unlock(info.id));
            return;
        }

        // Reset timeout if needed
        int rc = 0;
        if (timeout_ms() >= 0) {
            rc = bthread_timer_add(
                    &_timeout_id,
                    butil::microseconds_to_timespec(_deadline_us),
                    HandleTimeout, (void*)_correlation_id.value);
        }
        if (rc != 0) {
            SetFailed(rc, "Fail to add timer");
            goto END_OF_RPC;
        }
        if (!SingleServer()) {
            if (_accessed == NULL) {
                _accessed = ExcludedServers::Create(
                    std::min(_max_retry, RETRY_AVOIDANCE));
                if (NULL == _accessed) {
                    SetFailed(ENOMEM, "Fail to create ExcludedServers");
                    goto END_OF_RPC;
                }
            }
            _accessed->Add(_current_call.peer_id);
        }
        // _current_call does not end yet.
        CHECK(_unfinished_call == NULL);  // only one backup request now.
        _unfinished_call = new (std::nothrow) Call(&_current_call);
        if (_unfinished_call == NULL) {
            SetFailed(ENOMEM, "Fail to new Call");
            goto END_OF_RPC;
        }
        ++_current_call.nretry;
        add_flag(FLAGS_BACKUP_REQUEST);
        return IssueRPC(butil::gettimeofday_us());
    } else {
        auto retry_policy = _retry_policy ? _retry_policy : DefaultRetryPolicy();
        if (retry_policy->DoRetry(this)) {
            // The error must come from _current_call because:
            //  * we intercepted error from _unfinished_call in OnVersionedRPCReturned
            //  * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
            CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
            if (!SingleServer()) {
                if (_accessed == NULL) {
                    _accessed = ExcludedServers::Create(
                            std::min(_max_retry, RETRY_AVOIDANCE));
                    if (NULL == _accessed) {
                        SetFailed(ENOMEM, "Fail to create ExcludedServers");
                        goto END_OF_RPC;
                    }
                }
                _accessed->Add(_current_call.peer_id);
            }
            _current_call.OnComplete(this, _error_code, info.responded, false);
            ++_current_call.nretry;
            // Clear http responses before retrying, otherwise the response may
            // be mixed with older (and undefined) stuff. This is actually not
            // done before r32008.
            if (_http_response) {
                _http_response->Clear();
            }
            response_attachment().clear();

            // Retry backoff.
            bthread::TaskGroup* g = bthread::tls_task_group;
            int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L;
            if (backoff_time_us > 0 &&
                backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
                // No need to do retry backoff when the backoff time is longer than the remaining rpc time.
                if (retry_policy->CanRetryBackoffInPthread() ||
                    (g && !g->is_current_pthread_task())) {
                    bthread_usleep(backoff_time_us);
                } else {
                    LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
                                    "skip retry backoff in pthread.";
                }
            }
            return IssueRPC(butil::gettimeofday_us());
        }
    }

END_OF_RPC:
    if (new_bthread && !FLAGS_usercode_in_coroutine) {
        // [ Essential for -usercode_in_pthread=true ]
        // When -usercode_in_pthread is on, the reserved threads (set by
        // -usercode_backup_threads) may all block on bthread_id_lock in
        // ProcessXXXResponse(), until the id is unlocked or destroyed which
        // is run in a new thread when new_bthread is true. However since all
        // workers are blocked, the created bthread will never be scheduled
        // and result in deadlock.
        // Make the id unlockable before creating the bthread fixes the issue.
        // When -usercode_in_pthread is false, this also removes some useless
        // waiting of the bthreads processing responses.

        // Note[_done]: callid is destroyed after _done which possibly takes
        // a lot of time, stop useless locking

        // Note[cid]: When the callid needs to be destroyed in done->Run(),
        // it does not mean that it will be destroyed directly in done->Run(),
        // conversely the callid may still be locked/unlocked for many times
        // before destroying. E.g. in slective channel, the callid is referenced
        // by multiple sub-done and only destroyed by the last one. Calling
        // bthread_id_about_to_destroy right here which makes the id unlockable
        // anymore, is wrong. On the other hand, the combo channles setting
        // FLAGS_DESTROY_CID_IN_DONE to true must be aware of
        // -usercode_in_pthread and avoid deadlock by their own (TBR)

        if ((FLAGS_usercode_in_pthread || _done != NULL/*Note[_done]*/) &&
            !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
            bthread_id_about_to_destroy(info.id);
        }
        // No need to join this bthread since RPC caller won't wake up
        // (or user's done won't be called) until this bthread finishes
        bthread_t bt;
        bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
                               BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
        _tmp_completion_info = info;
        if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
            LOG(FATAL) << "Fail to start bthread";
            EndRPC(info);
        }
    } else {
        if (_done != NULL/*Note[_done]*/ &&
            !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
            bthread_id_about_to_destroy(info.id);
        }
        EndRPC(info);
    }
}