void UbrpcAdaptor::ParseNsheadMeta()

in src/brpc/policy/ubrpc2pb_protocol.cpp [45:167]


void UbrpcAdaptor::ParseNsheadMeta(
    const Server&, const NsheadMessage& request, Controller* cntl,
    NsheadMeta* out_meta) const {
    butil::IOBufAsZeroCopyInputStream zc_stream(request.body);
    mcpack2pb::InputStream stream(&zc_stream);
    if (!::mcpack2pb::unbox(&stream)) {
        cntl->SetFailed(EREQUEST, "Request is not a compack/mcpack2 object");
        return;
    }
    mcpack2pb::ObjectIterator it1(&stream, request.body.size() - stream.popped_bytes());
    bool found_content = false;
    for (; it1 != NULL; ++it1) {
        if (it1->name == "content") {
            found_content = true;
            break;
        }
    }
    if (!found_content) {
        cntl->SetFailed(EREQUEST, "Fail to find request.content");
        return;
    }
    if (it1->value.type() != mcpack2pb::FIELD_ARRAY) {
        cntl->SetFailed(EREQUEST, "Expect request.content to be array, "
                        "actually %s", mcpack2pb::type2str(it1->value.type()));
        return;
    }

    mcpack2pb::ArrayIterator it2(it1->value);
    if (it2 == NULL) {
        cntl->SetFailed(EREQUEST, "Fail to parse request.content as array");
        return;
    }
    std::string service_name;
    std::string method_name;
    bool has_params = false;
    size_t user_req_offset = 0;
    size_t user_req_size = 0;
    for (mcpack2pb::ObjectIterator it3(*it2); it3 != NULL; ++it3) {
        if (it3->name == "service_name") {
            if (it3->value.type() != mcpack2pb::FIELD_STRING) {
                cntl->SetFailed(EREQUEST, "Expect request.content[0].service_name"
                                " to be string, actually %s",
                                mcpack2pb::type2str(it3->value.type()));
                return;
            }
            it3->value.as_string(&service_name, "request.content[0].service_name");
        } else if (it3->name == "method") {
            if (it3->value.type() != mcpack2pb::FIELD_STRING) {
                cntl->SetFailed(EREQUEST, "Expect request.content[0].method"
                                " to be string, actually %s",
                                mcpack2pb::type2str(it3->value.type()));
                return;
            }
            it3->value.as_string(&method_name, "request.content[0].method");
        } else if (it3->name == "id") {
            if (!mcpack2pb::is_primitive(it3->value.type()) ||
                !mcpack2pb::is_integral((mcpack2pb::PrimitiveFieldType)it3->value.type())) {
                cntl->SetFailed(ERESPONSE, "request.content[0].id must be "
                                "integer, actually %s",
                                mcpack2pb::type2str(it3->value.type()));
                return;
            }
            out_meta->set_correlation_id(
                it3->value.as_int64("request.content[0].id"));
        } else if (it3->name == "params") {
            if (it3->value.type() != mcpack2pb::FIELD_OBJECT) {
                cntl->SetFailed(EREQUEST, "Expect request.content[0].params "
                                "to be object, actually %s",
                                mcpack2pb::type2str(it3->value.type()));
                return;
            }
            has_params = true;
            user_req_offset = stream.popped_bytes();
            user_req_size = it3->value.size();
            const size_t stream_end = stream.popped_bytes() + it3->value.size();
            mcpack2pb::ObjectIterator it4(it3->value);
            if (it4 == NULL || it4.field_count() == 0) {
                cntl->SetFailed(EREQUEST, "Nothing in request.content[0].params");
                return;
            }
            if (it4.field_count() == 1) {
                user_req_offset = stream.popped_bytes();
                user_req_size = it4->value.size();
            }
            // Pop left bytes, otherwise ++it3 may complain about
            // "not fully consumed".
            if (stream_end > stream.popped_bytes()) {
                stream.popn(stream_end - stream.popped_bytes());
            }
        }
    }
    if (service_name.empty()) {
        cntl->SetFailed(EREQUEST, "Fail to find request.content[0].service_name");
        return;
    }
    if (method_name.empty()) {
        cntl->SetFailed(EREQUEST, "Fail to find request.content[0].method");
        return;
    }
    if (!has_params) {
        cntl->SetFailed(EREQUEST, "Fail to find request.content[0].params");
        return;
    }

    // Change request.body with the user's request.
    butil::IOBuf& buf = const_cast<butil::IOBuf&>(request.body);
    buf.pop_front(user_req_offset);
    if (buf.size() != user_req_size) {
        if (buf.size() < user_req_size) {
            cntl->SetFailed(EREQUEST, "request_size=%" PRIu64 " is shorter than"
                            "specified=%" PRIu64, (uint64_t)buf.size(),
                            (uint64_t)user_req_size);
            return;
        }
        buf.pop_back(buf.size() - user_req_size);
    }
    std::string full_method_name;
    full_method_name.reserve(service_name.size() + 1 + method_name.size());
    full_method_name.append(service_name);
    full_method_name.push_back('.');
    full_method_name.append(method_name);
    out_meta->set_full_method_name(full_method_name);
}