void ParallelChannel::CallMethod()

in src/brpc/parallel_channel.cpp [580:764]


void ParallelChannel::CallMethod(
    const google::protobuf::MethodDescriptor* method,
    google::protobuf::RpcController* cntl_base,
    const google::protobuf::Message* request,
    google::protobuf::Message* response,
    google::protobuf::Closure* done) {
    Controller* cntl = static_cast<Controller*>(cntl_base);
    cntl->OnRPCBegin(butil::gettimeofday_us());
    // Make sure cntl->sub_count() always equal #sub-channels
    const int nchan = _chans.size();
    cntl->_pchan_sub_count = nchan;

    const CallId cid = cntl->call_id();
    const int rc = bthread_id_lock(cid, NULL);
    if (rc != 0) {
        CHECK_EQ(EINVAL, rc);
        if (!cntl->FailedInline()) {
            cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64, cid.value);
        }
        LOG_IF(ERROR, cntl->is_used_by_rpc())
            << "Controller=" << cntl << " was used by another RPC before. "
            "Did you forget to Reset() it before reuse?";
        // Have to run done in-place.
        // Read comment in CallMethod() in channel.cpp for details.
        if (done) {
            done->Run();
        }
        return;
    }
    cntl->set_used_by_rpc();

    ParallelChannelDone* d = NULL;
    int ndone = nchan;
    int fail_limit = 1;
    int success_limit = 1;
    DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);

    if (cntl->FailedInline()) {
        // The call_id is cancelled before RPC.
        goto FAIL;
    }
    // we don't support http whose response is NULL.
    if (response == NULL) {
        cntl->SetFailed(EINVAL, "response must be non-NULL");
        goto FAIL;
    }
    if (nchan == 0) {
        cntl->SetFailed(EPERM, "No channels added");
        goto FAIL;
    }

    for (int i = 0; i < nchan; ++i) {
        SubChan& sub_chan = _chans[i];
        if (sub_chan.call_mapper != NULL) {
            aps[i] = sub_chan.call_mapper->Map(i, nchan, method, request, response);
            // Test is_skip first because it implies is_bad.
            if (aps[i].is_skip()) {
                --ndone;
            } else if (aps[i].is_bad()) {
                cntl->SetFailed(
                    EREQUEST, "CallMapper of channel[%d] returns Bad()", i);
                goto FAIL;
            }
        } else {
            google::protobuf::Message* cur_res = response->New();
            if (cur_res == NULL) {
                cntl->SetFailed(ENOMEM, "Fail to new response");
                goto FAIL;
            }
            aps[i] = SubCall(method, request, cur_res, DELETE_RESPONSE);
        }
    }
    if (ndone <= 0) {
        cntl->SetFailed(ECANCELED, "Skipped all channels(%d)", nchan);
        goto FAIL;
    }

    if (_options.fail_limit < 0) {
        // Both Controller and ParallelChannel haven't set `fail_limit'
        fail_limit = ndone;
    } else {
        fail_limit = _options.fail_limit;
        if (fail_limit < 1) {
            fail_limit = 1;
        } else if (fail_limit > ndone) {
            fail_limit = ndone;
        }
    }

    // `success_limit' is only valid when `fail_limit' is not set.
    if (_options.fail_limit >= 0 || _options.success_limit < 0) {
        success_limit = ndone;
    } else {
        success_limit = _options.success_limit;
        if (success_limit < 1) {
            success_limit = 1;
        } else if (success_limit > ndone) {
            success_limit = ndone;
        }
    }

    d = ParallelChannelDone::Create(
        fail_limit, success_limit, ndone, aps, nchan, cntl, done);
    if (NULL == d) {
        cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
        goto FAIL;
    }

    for (int i = 0, j = 0; i < nchan; ++i) {
        SubChan& sub_chan = _chans[i];
        if (!aps[i].is_skip()) {
            ParallelChannelDone::SubDone* sd = d->sub_done(j++);
            sd->ap = aps[i];
            sd->shared_data = d;
            sd->merger = sub_chan.merger;
        }
    }
    cntl->_response = response;
    cntl->_done = d;
    cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE);

    if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
        cntl->set_timeout_ms(_options.timeout_ms);
    }
    if (cntl->timeout_ms() >= 0) {
        cntl->_deadline_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
        // Setup timer for RPC timetout
        const int rc = bthread_timer_add(
            &cntl->_timeout_id,
            butil::microseconds_to_timespec(cntl->_deadline_us),
            HandleTimeout, (void*)cid.value);
        if (rc != 0) {
            cntl->SetFailed(rc, "Fail to add timer");
            goto FAIL;
        }
    } else {
        cntl->_deadline_us = -1;
    }
    d->SaveThreadInfoOfCallsite();
    CHECK_EQ(0, bthread_id_unlock(cid));
    // Don't touch `cntl' and `d' again (for async RPC)
    
    for (int i = 0, j = 0; i < nchan; ++i) {
        if (!aps[i].is_skip()) {
            ParallelChannelDone::SubDone* sd = d->sub_done(j++);
            // Forward the attachment to each sub call
            sd->cntl.request_attachment().append(cntl->request_attachment());
            _chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
                                       sd->ap.request, sd->ap.response, sd);
        }
        // Although we can delete request (if delete_request is true) after
        // starting sub call, we leave it in ~SubCall(called when d is
        // Destroy()-ed) because we may need to check requests for debugging
        // purposes.
    }
    if (done == NULL) {
        Join(cid);
        cntl->OnRPCEnd(butil::gettimeofday_us());
    }
    return;

FAIL:
    // The RPC was failed after locking call_id and before calling sub channels.
    if (d) {
        // Set the _done to NULL to make sure cntl->sub(any_index) is NULL.
        cntl->_done = NULL;
        ParallelChannelDone::Destroy(d);
    }
    if (done) {
        if (!cntl->is_done_allowed_to_run_in_place()) {
            bthread_t bh;
            bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
                                   BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
            // Hack: save done in cntl->_done to remove a malloc of args.
            cntl->_done = done;
            if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) {
                return;
            }
            cntl->_done = NULL;
            LOG(FATAL) << "Fail to start bthread";
        }
        done->Run();
    }
    CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
}