src/dubbo/protocol/invocation.py (37 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Any, Optional from ._interfaces import Invocation class RpcInvocation(Invocation): """ The RpcInvocation class is an implementation of the Invocation interface. """ __slots__ = [ "_service_name", "_method_name", "_argument", "_attachments", "_attributes", ] def __init__( self, service_name: str, method_name: str, argument: Any, attachments: Optional[dict[str, str]] = None, attributes: Optional[dict[str, Any]] = None, ): """ Initialize a new RpcInvocation instance. :param service_name: The service name. :type service_name: str :param method_name: The method name. :type method_name: str :param argument: The argument. :type argument: Any :param attachments: The attachments. :type attachments: Optional[Dict[str, str]] :param attributes: The attributes. :type attributes: Optional[Dict[str, Any]] """ self._service_name = service_name self._method_name = method_name self._argument = argument self._attachments = attachments or {} self._attributes = attributes or {} def add_attachment(self, key: str, value: str) -> None: self._attachments[key] = value def get_attachment(self, key: str) -> Optional[str]: return self._attachments.get(key, None) def add_attribute(self, key: str, value: Any) -> None: self._attributes[key] = value def get_attribute(self, key: str) -> Optional[Any]: return self._attributes.get(key, None) def get_service_name(self) -> str: return self._service_name def get_method_name(self) -> str: return self._method_name def get_argument(self) -> Any: return self._argument