in src/dubbo/protocol/triple/invoker.py [0:0]
def _create_metadata(self, invocation: Invocation) -> RequestMetadata:
"""
Create the metadata.
:param invocation: The invocation.
:type invocation: Invocation
:return: The metadata.
:rtype: RequestMetadata
:raise ExtensionError: If the compressor is not supported.
"""
metadata = RequestMetadata()
# set service and method
metadata.service = invocation.get_service_name()
metadata.method = invocation.get_method_name()
# get scheme
metadata.scheme = (
TripleHeaderValue.HTTPS.value
if self._url.parameters.get(common_constants.SSL_ENABLED_KEY, False)
else TripleHeaderValue.HTTP.value
)
# get compressor
compression = self._url.parameters.get(common_constants.COMPRESSION_KEY, Identity.get_message_encoding())
if metadata.compressor.get_message_encoding() != compression:
try:
metadata.compressor = extensionLoader.get_extension(Compressor, compression)()
except ExtensionError as e:
_LOGGER.error("Unsupported compressor: %s", compression)
raise e
# get address
metadata.address = self._url.location
# TODO add more metadata
metadata.attachments[TripleHeaderName.TE.value] = TripleHeaderValue.TRAILERS.value
return metadata