def _create_metadata()

in src/dubbo/protocol/triple/invoker.py [0:0]


    def _create_metadata(self, invocation: Invocation) -> RequestMetadata:
        """
        Create the metadata.
        :param invocation: The invocation.
        :type invocation: Invocation
        :return: The metadata.
        :rtype: RequestMetadata
        :raise ExtensionError: If the compressor is not supported.
        """
        metadata = RequestMetadata()
        # set service and method
        metadata.service = invocation.get_service_name()
        metadata.method = invocation.get_method_name()

        # get scheme
        metadata.scheme = (
            TripleHeaderValue.HTTPS.value
            if self._url.parameters.get(common_constants.SSL_ENABLED_KEY, False)
            else TripleHeaderValue.HTTP.value
        )

        # get compressor
        compression = self._url.parameters.get(common_constants.COMPRESSION_KEY, Identity.get_message_encoding())
        if metadata.compressor.get_message_encoding() != compression:
            try:
                metadata.compressor = extensionLoader.get_extension(Compressor, compression)()
            except ExtensionError as e:
                _LOGGER.error("Unsupported compressor: %s", compression)
                raise e

        # get address
        metadata.address = self._url.location

        # TODO add more metadata
        metadata.attachments[TripleHeaderName.TE.value] = TripleHeaderValue.TRAILERS.value

        return metadata