in src/dubbo/protocol/triple/stream/server_stream.py [0:0]
def complete(self, status: TriRpcStatus, attachments: dict[str, Any]) -> None:
trailers = Http2Headers()
if not self.headers_sent:
trailers.status = HttpStatus.OK.value
trailers.add(
TripleHeaderName.CONTENT_TYPE.value,
TripleHeaderValue.APPLICATION_GRPC_PROTO.value,
)
# add attachments
[trailers.add(k, v) for k, v in attachments.items()]
# add status
trailers.add(TripleHeaderName.GRPC_STATUS.value, status.code.value)
if status.code is not GRpcCode.OK:
trailers.add(
TripleHeaderName.GRPC_MESSAGE.value,
TriRpcStatus.limit_desc(status.description),
)
# send trailers
self._headers_sent = True
self._trailers_sent = True
self._stream.send_headers(trailers, end_stream=True)