def _get_status_from_trailers()

in src/dubbo/protocol/triple/stream/client_stream.py [0:0]


    def _get_status_from_trailers(self, trailers: Http2Headers) -> TriRpcStatus:
        """
        Validate the trailers.
        :param trailers: The trailers.
        :type trailers: Http2Headers
        :return: The RPC status.
        :rtype: TriRpcStatus
        """
        grpc_status_code = int(trailers.get(TripleHeaderName.GRPC_STATUS.value, "-1"))
        if grpc_status_code != -1:
            status = TriRpcStatus.from_rpc_code(grpc_status_code)
            message = trailers.get(TripleHeaderName.GRPC_MESSAGE.value, "")
            status.append_description(message)
            return status

        # If the status code is not found , something is broken. Try to provide a rational error.
        if self._headers_received:
            return TriRpcStatus(GRpcCode.UNKNOWN, description="Missing GRPC status in response")

        # Try to get status from headers
        status_code = int(trailers.status) if trailers.status else None
        if status_code is not None:
            status = TriRpcStatus.from_http_code(status_code)
        else:
            status = TriRpcStatus(GRpcCode.INTERNAL, description="Missing HTTP status code")

        status.append_description("Missing GRPC status, please infer the error from the HTTP status code")
        return status