def on_headers()

in src/dubbo/protocol/triple/stream/server_stream.py [0:0]


    def on_headers(self, headers: Http2Headers, end_stream: bool) -> None:
        # check http method
        if headers.method != HttpMethod.POST.value:
            self._response_plain_text_error(
                HttpStatus.METHOD_NOT_ALLOWED.value,
                TriRpcStatus(
                    GRpcCode.INTERNAL,
                    description=f"Method {headers.method} is not supported",
                ),
            )
            return

        # check content type
        content_type = headers.get(TripleHeaderName.CONTENT_TYPE.value, "")
        if not content_type.startswith(TripleHeaderValue.APPLICATION_GRPC.value):
            self._response_plain_text_error(
                HttpStatus.UNSUPPORTED_MEDIA_TYPE.value,
                TriRpcStatus(
                    GRpcCode.UNIMPLEMENTED,
                    description=(
                        f"Content-Type {content_type} is not supported"
                        if content_type
                        else "Content-Type is missing from the request"
                    ),
                ),
            )
            return

        # check path
        path = headers.path
        if not path:
            self._response_plain_text_error(
                HttpStatus.NOT_FOUND.value,
                TriRpcStatus(
                    GRpcCode.UNIMPLEMENTED,
                    description="Expected path but is missing",
                ),
            )
            return
        elif not path.startswith("/"):
            self._response_plain_text_error(
                HttpStatus.NOT_FOUND.value,
                TriRpcStatus(
                    GRpcCode.UNIMPLEMENTED,
                    description=f"Expected path to start with /: {path}",
                ),
            )
            return

        # split the path
        parts = path.split("/")
        if len(parts) != 3:
            self._response_error(TriRpcStatus(GRpcCode.UNIMPLEMENTED, description=f"Bad path format: {path}"))
            return

        service_name, method_name = parts[1], parts[2]

        # get method handler
        handler = self._get_handler(service_name, method_name)
        if not handler:
            self._response_error(
                TriRpcStatus(
                    GRpcCode.UNIMPLEMENTED,
                    description=f"Service {service_name} is not found",
                )
            )
            return

        if end_stream:
            # Invalid request, ignore it.
            return

        decompressor: Decompressor = Identity()
        message_encoding = headers.get(TripleHeaderName.GRPC_ENCODING.value)
        if message_encoding and message_encoding != decompressor.get_message_encoding():
            # update decompressor
            try:
                decompressor = extensionLoader.get_extension(Decompressor, message_encoding)()
            except ExtensionError:
                self._response_error(
                    TriRpcStatus(
                        GRpcCode.UNIMPLEMENTED,
                        description=f"Grpc-encoding '{message_encoding}' is not supported",
                    )
                )
                return

        # create a server call
        self._listener = TripleServerCall(TripleServerStream(self._stream), handler, self._executor)

        # create a decoder
        self._decoder = TriDecoder(ServerTransportListener.ServerDecoderListener(self._listener), decompressor)

        # deliver the headers to the listener
        self._listener.on_headers(headers.to_dict())