in src/brpc/controller.cpp [777:869]
void Controller::Call::OnComplete(
Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) {
if (stream_user_data) {
stream_user_data->DestroyStreamUserData(sending_sock, c, error_code, end_of_rpc);
stream_user_data = NULL;
}
if (sending_sock != NULL) {
if (error_code != 0) {
sending_sock->AddRecentError();
}
if (enable_circuit_breaker) {
sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
}
switch (c->connection_type()) {
case CONNECTION_TYPE_UNKNOWN:
break;
case CONNECTION_TYPE_SINGLE:
// Set main socket to be failed for connection refusal of streams.
// "single" streams are often maintained in a separate SocketMap and
// different from the main socket as well.
if (c->_stream_creator != NULL &&
does_error_affect_main_socket(error_code) &&
(sending_sock == NULL || sending_sock->id() != peer_id)) {
Socket::SetFailed(peer_id);
}
break;
case CONNECTION_TYPE_POOLED:
// NOTE: Not reuse pooled connection if this call fails and no response
// has been received through this connection
// Otherwise in-flight responses may come back in future and break the
// assumption that one pooled connection cannot have more than one
// message at the same time.
if (sending_sock != NULL && (error_code == 0 || responded)) {
if (!sending_sock->is_read_progressive()) {
// Normally-read socket which will not be used after RPC ends,
// safe to return. Notice that Socket::is_read_progressive may
// differ from Controller::is_response_read_progressively()
// because RPC possibly ends before setting up the socket.
sending_sock->ReturnToPool();
} else {
// Progressively-read socket. Should be returned when the read
// ends. The method handles the details.
sending_sock->OnProgressiveReadCompleted();
}
break;
}
// fall through
case CONNECTION_TYPE_SHORT:
if (sending_sock != NULL) {
// Check the comment in CONNECTION_TYPE_POOLED branch.
if (!sending_sock->is_read_progressive()) {
if (c->_stream_creator == NULL) {
sending_sock->SetFailed();
}
} else {
sending_sock->OnProgressiveReadCompleted();
}
}
if (does_error_affect_main_socket(error_code)) {
// main socket should die as well.
// NOTE: main socket may be wrongly set failed (provided that
// short/pooled socket does not hold a ref of the main socket).
// E.g. an in-parallel RPC sets the peer_id to be failed
// -> this RPC meets ECONNREFUSED
// -> main socket gets revived from HC
// -> this RPC sets main socket to be failed again.
Socket::SetFailed(peer_id);
}
break;
}
if (ELOGOFF == error_code) {
SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) {
// Block this `Socket' while not closing the fd
sock->SetLogOff();
}
}
if (need_feedback) {
const LoadBalancer::CallInfo info =
{ begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info);
}
// Release the `Socket' we used to send/receive data
sending_sock.reset(NULL);
}