in src/dubbo/protocol/triple/stream/client_stream.py [0:0]
def _get_status_from_trailers(self, trailers: Http2Headers) -> TriRpcStatus:
"""
Validate the trailers.
:param trailers: The trailers.
:type trailers: Http2Headers
:return: The RPC status.
:rtype: TriRpcStatus
"""
grpc_status_code = int(trailers.get(TripleHeaderName.GRPC_STATUS.value, "-1"))
if grpc_status_code != -1:
status = TriRpcStatus.from_rpc_code(grpc_status_code)
message = trailers.get(TripleHeaderName.GRPC_MESSAGE.value, "")
status.append_description(message)
return status
# If the status code is not found , something is broken. Try to provide a rational error.
if self._headers_received:
return TriRpcStatus(GRpcCode.UNKNOWN, description="Missing GRPC status in response")
# Try to get status from headers
status_code = int(trailers.status) if trailers.status else None
if status_code is not None:
status = TriRpcStatus.from_http_code(status_code)
else:
status = TriRpcStatus(GRpcCode.INTERNAL, description="Missing HTTP status code")
status.append_description("Missing GRPC status, please infer the error from the HTTP status code")
return status