in src/dubbo/protocol/triple/metadata.py [0:0]
def to_headers(self) -> Http2Headers:
"""
Convert to HTTP/2 headers.
:return: The HTTP/2 headers.
:rtype: Http2Headers
"""
headers = Http2Headers()
headers.scheme = self.scheme
headers.authority = self.address
headers.method = HttpMethod.POST.value
headers.path = f"/{self.service}/{self.method}"
headers.add(
TripleHeaderName.CONTENT_TYPE.value,
TripleHeaderValue.APPLICATION_GRPC_PROTO.value,
)
if self.version != "1.0.0":
set_if_not_none(headers, TripleHeaderName.SERVICE_VERSION.value, self.version)
set_if_not_none(headers, TripleHeaderName.GRPC_TIMEOUT.value, self.timeout)
set_if_not_none(headers, TripleHeaderName.SERVICE_GROUP.value, self.group)
set_if_not_none(headers, TripleHeaderName.CONSUMER_APP_NAME.value, self.application)
set_if_not_none(headers, TripleHeaderName.GRPC_ENCODING.value, self.acceptEncoding)
if self.compressor.get_message_encoding() != Identity.get_message_encoding():
set_if_not_none(
headers,
TripleHeaderName.GRPC_ENCODING.value,
self.compressor.get_message_encoding(),
)
[headers.add(k, str(v)) for k, v in self.attachments.items()]
return headers