in src/dubbo/protocol/triple/stream/server_stream.py [0:0]
def on_headers(self, headers: Http2Headers, end_stream: bool) -> None:
# check http method
if headers.method != HttpMethod.POST.value:
self._response_plain_text_error(
HttpStatus.METHOD_NOT_ALLOWED.value,
TriRpcStatus(
GRpcCode.INTERNAL,
description=f"Method {headers.method} is not supported",
),
)
return
# check content type
content_type = headers.get(TripleHeaderName.CONTENT_TYPE.value, "")
if not content_type.startswith(TripleHeaderValue.APPLICATION_GRPC.value):
self._response_plain_text_error(
HttpStatus.UNSUPPORTED_MEDIA_TYPE.value,
TriRpcStatus(
GRpcCode.UNIMPLEMENTED,
description=(
f"Content-Type {content_type} is not supported"
if content_type
else "Content-Type is missing from the request"
),
),
)
return
# check path
path = headers.path
if not path:
self._response_plain_text_error(
HttpStatus.NOT_FOUND.value,
TriRpcStatus(
GRpcCode.UNIMPLEMENTED,
description="Expected path but is missing",
),
)
return
elif not path.startswith("/"):
self._response_plain_text_error(
HttpStatus.NOT_FOUND.value,
TriRpcStatus(
GRpcCode.UNIMPLEMENTED,
description=f"Expected path to start with /: {path}",
),
)
return
# split the path
parts = path.split("/")
if len(parts) != 3:
self._response_error(TriRpcStatus(GRpcCode.UNIMPLEMENTED, description=f"Bad path format: {path}"))
return
service_name, method_name = parts[1], parts[2]
# get method handler
handler = self._get_handler(service_name, method_name)
if not handler:
self._response_error(
TriRpcStatus(
GRpcCode.UNIMPLEMENTED,
description=f"Service {service_name} is not found",
)
)
return
if end_stream:
# Invalid request, ignore it.
return
decompressor: Decompressor = Identity()
message_encoding = headers.get(TripleHeaderName.GRPC_ENCODING.value)
if message_encoding and message_encoding != decompressor.get_message_encoding():
# update decompressor
try:
decompressor = extensionLoader.get_extension(Decompressor, message_encoding)()
except ExtensionError:
self._response_error(
TriRpcStatus(
GRpcCode.UNIMPLEMENTED,
description=f"Grpc-encoding '{message_encoding}' is not supported",
)
)
return
# create a server call
self._listener = TripleServerCall(TripleServerStream(self._stream), handler, self._executor)
# create a decoder
self._decoder = TriDecoder(ServerTransportListener.ServerDecoderListener(self._listener), decompressor)
# deliver the headers to the listener
self._listener.on_headers(headers.to_dict())