void ProcessHttpResponse()

in src/brpc/policy/http_rpc_protocol.cpp [345:561]


void ProcessHttpResponse(InputMessageBase* msg) {
    const int64_t start_parse_us = butil::cpuwide_time_us();
    DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
    Socket* socket = imsg_guard->socket();
    uint64_t cid_value;
    const bool is_http2 = imsg_guard->header().is_http2();
    if (is_http2) {
        H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
        cid_value = h2_sctx->correlation_id();
    } else {
        cid_value = socket->correlation_id();
    }
    if (cid_value == 0) {
        LOG(WARNING) << "Fail to find correlation_id from " << *socket;
        return;
    }
    const bthread_id_t cid = { cid_value };
    Controller* cntl = NULL;
    const int rc = bthread_id_lock(cid, (void**)&cntl);
    if (rc != 0) {
        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
        return;
    }

    ControllerPrivateAccessor accessor(cntl);

    Span* span = accessor.span();
    if (span) {
        span->set_base_real_us(msg->base_real_us());
        span->set_received_us(msg->received_us());
        // TODO: changing when imsg_guard->read_body_progressively() is true
        span->set_response_size(imsg_guard->parsed_length());
        span->set_start_parse_us(start_parse_us);
    }

    HttpHeader* res_header = &cntl->http_response();
    res_header->Swap(imsg_guard->header());
    butil::IOBuf& res_body = imsg_guard->body();
    CHECK(cntl->response_attachment().empty());
    const int saved_error = cntl->ErrorCode();

    bool is_grpc_ct = false;
    const HttpContentType content_type =
        ParseContentType(res_header->content_type(), &is_grpc_ct);
    const bool is_grpc = (is_http2 && is_grpc_ct);
    bool grpc_compressed = false;  // only valid when is_grpc is true.
    
    do {
        if (!is_http2) {
            // If header has "Connection: close", close the connection.
            const std::string* conn_cmd = res_header->GetHeader(common->CONNECTION);
            if (conn_cmd != NULL && 0 == strcasecmp(conn_cmd->c_str(), "close")) {
                // Server asked to close the connection.
                if (imsg_guard->read_body_progressively()) {
                    // Close the socket when reading completes.
                    socket->read_will_be_progressive(CONNECTION_TYPE_SHORT);
                } else {
                    socket->SetFailed();
                }
            }
        } else if (is_grpc) {
            if (!RemoveGrpcPrefix(&res_body, &grpc_compressed)) {
                cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
                break;
            }
            const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
            if (grpc_status) {
                // TODO: More strict parsing
                GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
                if (status != GRPC_OK) {
                    const std::string* grpc_message =
                        res_header->GetHeader(common->GRPC_MESSAGE);
                    if (grpc_message) {
                        std::string message_decoded;
                        PercentDecode(*grpc_message, &message_decoded);
                        cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
                                        message_decoded.c_str());
                    } else {
                        cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
                                        GrpcStatusToString(status));
                    }
                    break;
                }
            }
        }

        if (imsg_guard->read_body_progressively()) {
            // Set RPA if needed
            accessor.set_readable_progressive_attachment(imsg_guard.get());
            const int sc = res_header->status_code();
            if (sc < 200 || sc >= 300) {
                // Even if the body is for streaming purpose, a non-OK status
                // code indicates that the body is probably the error text
                // which is helpful for debugging.
                // content may be binary data, so the size limit is a must.
                std::string body_str;
                res_body.copy_to(
                    &body_str, std::min((int)res_body.size(),
                                        FLAGS_http_max_error_length));
                cntl->SetFailed(EHTTP, "HTTP/%d.%d %d %s: %.*s",
                                res_header->major_version(),
                                res_header->minor_version(),
                                static_cast<int>(res_header->status_code()),
                                res_header->reason_phrase(),
                                (int)body_str.size(), body_str.c_str());
            } else if (cntl->response() != NULL &&
                       cntl->response()->GetDescriptor()->field_count() != 0) {
                cntl->SetFailed(ERESPONSE, "A protobuf response can't be parsed"
                                " from progressively-read HTTP body");
            }
            break;
        }
        
        // Fail RPC if status code is an error in http sense.
        // ErrorCode of RPC is unified to EHTTP.
        const int sc = res_header->status_code();
        if (sc < 200 || sc >= 300) {
            std::string err = butil::string_printf(
                    "HTTP/%d.%d %d %s",
                    res_header->major_version(),
                    res_header->minor_version(),
                    static_cast<int>(res_header->status_code()),
                    res_header->reason_phrase());
            if (!res_body.empty()) {
                // Use content as error text if it's present. Notice that
                // content may be binary data, so the size limit is a must.
                err.append(": ");
                res_body.append_to(
                    &err, std::min((int)res_body.size(),
                                        FLAGS_http_max_error_length));
            }
            // If server return brpc error code by x-bd-error-code,
            // set the returned error code to controller. Otherwise,
            // set EHTTP to controller uniformly.
            const std::string* error_code_ptr = res_header->GetHeader(common->ERROR_CODE);
            int error_code = error_code_ptr ? strtol(error_code_ptr->data(), NULL, 10) : 0;
            if (FLAGS_use_http_error_code && error_code != 0) {
                cntl->SetFailed(error_code, "%s", err.c_str());
            } else {
                cntl->SetFailed(EHTTP, "%s", err.c_str());
            }
            if (cntl->response() == NULL ||
                cntl->response()->GetDescriptor()->field_count() == 0) {
                // A http call. Http users may need the body(containing a html,
                // json etc) even if the http call was failed. This is different
                // from protobuf services where responses are undefined when RPC
                // was failed.
                cntl->response_attachment().swap(res_body);
            }
            break;
        }
        if (cntl->response() == NULL ||
            cntl->response()->GetDescriptor()->field_count() == 0) {
            // a http call, content is the "real response".
            cntl->response_attachment().swap(res_body);
            break;
        }

        const std::string* encoding = NULL;
        if (is_grpc) {
            if (grpc_compressed) {
                encoding = res_header->GetHeader(common->GRPC_ENCODING);
                if (encoding == NULL) {
                    cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding' "
                                               "in compressed gRPC response");
                    break;
                }
            }
        } else {
            encoding = res_header->GetHeader(common->CONTENT_ENCODING);
        }
        if (encoding != NULL && *encoding == common->GZIP) {
            TRACEPRINTF("Decompressing response=%lu",
                        (unsigned long)res_body.size());
            butil::IOBuf uncompressed;
            if (!policy::GzipDecompress(res_body, &uncompressed)) {
                cntl->SetFailed(ERESPONSE, "Fail to un-gzip response body");
                break;
            }
            res_body.swap(uncompressed);
        }
        if (content_type == HTTP_CONTENT_PROTO) {
            if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
                cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
                                cntl->response()->GetDescriptor()->full_name().c_str());
                break;
            }
        } else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
            if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
                cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s",
                                cntl->response()->GetDescriptor()->full_name().c_str());
                break;
            }
        } else if (content_type == HTTP_CONTENT_JSON) {
            // Message body is json.
            if (!JsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
                break;
            }
        } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
            // Message body is json.
            if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
                break;
            }
        } else {
            cntl->SetFailed(ERESPONSE,
                            "Unknown content-type=%s when response is not NULL",
                            res_header->content_type().c_str());
            break;
        }
    } while (0);

    // Unlocks correlation_id inside. Revert controller's
    // error code if it version check of `cid' fails
    imsg_guard.reset();
    accessor.OnResponse(cid, saved_error);
}