in src/brpc/parallel_channel.cpp [580:764]
void ParallelChannel::CallMethod(
const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* cntl_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
Controller* cntl = static_cast<Controller*>(cntl_base);
cntl->OnRPCBegin(butil::gettimeofday_us());
// Make sure cntl->sub_count() always equal #sub-channels
const int nchan = _chans.size();
cntl->_pchan_sub_count = nchan;
const CallId cid = cntl->call_id();
const int rc = bthread_id_lock(cid, NULL);
if (rc != 0) {
CHECK_EQ(EINVAL, rc);
if (!cntl->FailedInline()) {
cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64, cid.value);
}
LOG_IF(ERROR, cntl->is_used_by_rpc())
<< "Controller=" << cntl << " was used by another RPC before. "
"Did you forget to Reset() it before reuse?";
// Have to run done in-place.
// Read comment in CallMethod() in channel.cpp for details.
if (done) {
done->Run();
}
return;
}
cntl->set_used_by_rpc();
ParallelChannelDone* d = NULL;
int ndone = nchan;
int fail_limit = 1;
int success_limit = 1;
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
if (cntl->FailedInline()) {
// The call_id is cancelled before RPC.
goto FAIL;
}
// we don't support http whose response is NULL.
if (response == NULL) {
cntl->SetFailed(EINVAL, "response must be non-NULL");
goto FAIL;
}
if (nchan == 0) {
cntl->SetFailed(EPERM, "No channels added");
goto FAIL;
}
for (int i = 0; i < nchan; ++i) {
SubChan& sub_chan = _chans[i];
if (sub_chan.call_mapper != NULL) {
aps[i] = sub_chan.call_mapper->Map(i, nchan, method, request, response);
// Test is_skip first because it implies is_bad.
if (aps[i].is_skip()) {
--ndone;
} else if (aps[i].is_bad()) {
cntl->SetFailed(
EREQUEST, "CallMapper of channel[%d] returns Bad()", i);
goto FAIL;
}
} else {
google::protobuf::Message* cur_res = response->New();
if (cur_res == NULL) {
cntl->SetFailed(ENOMEM, "Fail to new response");
goto FAIL;
}
aps[i] = SubCall(method, request, cur_res, DELETE_RESPONSE);
}
}
if (ndone <= 0) {
cntl->SetFailed(ECANCELED, "Skipped all channels(%d)", nchan);
goto FAIL;
}
if (_options.fail_limit < 0) {
// Both Controller and ParallelChannel haven't set `fail_limit'
fail_limit = ndone;
} else {
fail_limit = _options.fail_limit;
if (fail_limit < 1) {
fail_limit = 1;
} else if (fail_limit > ndone) {
fail_limit = ndone;
}
}
// `success_limit' is only valid when `fail_limit' is not set.
if (_options.fail_limit >= 0 || _options.success_limit < 0) {
success_limit = ndone;
} else {
success_limit = _options.success_limit;
if (success_limit < 1) {
success_limit = 1;
} else if (success_limit > ndone) {
success_limit = ndone;
}
}
d = ParallelChannelDone::Create(
fail_limit, success_limit, ndone, aps, nchan, cntl, done);
if (NULL == d) {
cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
goto FAIL;
}
for (int i = 0, j = 0; i < nchan; ++i) {
SubChan& sub_chan = _chans[i];
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
sd->ap = aps[i];
sd->shared_data = d;
sd->merger = sub_chan.merger;
}
}
cntl->_response = response;
cntl->_done = d;
cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE);
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
cntl->set_timeout_ms(_options.timeout_ms);
}
if (cntl->timeout_ms() >= 0) {
cntl->_deadline_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
// Setup timer for RPC timetout
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_deadline_us),
HandleTimeout, (void*)cid.value);
if (rc != 0) {
cntl->SetFailed(rc, "Fail to add timer");
goto FAIL;
}
} else {
cntl->_deadline_us = -1;
}
d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid));
// Don't touch `cntl' and `d' again (for async RPC)
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
// Forward the attachment to each sub call
sd->cntl.request_attachment().append(cntl->request_attachment());
_chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
sd->ap.request, sd->ap.response, sd);
}
// Although we can delete request (if delete_request is true) after
// starting sub call, we leave it in ~SubCall(called when d is
// Destroy()-ed) because we may need to check requests for debugging
// purposes.
}
if (done == NULL) {
Join(cid);
cntl->OnRPCEnd(butil::gettimeofday_us());
}
return;
FAIL:
// The RPC was failed after locking call_id and before calling sub channels.
if (d) {
// Set the _done to NULL to make sure cntl->sub(any_index) is NULL.
cntl->_done = NULL;
ParallelChannelDone::Destroy(d);
}
if (done) {
if (!cntl->is_done_allowed_to_run_in_place()) {
bthread_t bh;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
// Hack: save done in cntl->_done to remove a malloc of args.
cntl->_done = done;
if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) {
return;
}
cntl->_done = NULL;
LOG(FATAL) << "Fail to start bthread";
}
done->Run();
}
CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
}