def complete()

in src/dubbo/protocol/triple/stream/server_stream.py [0:0]


    def complete(self, status: TriRpcStatus, attachments: dict[str, Any]) -> None:
        trailers = Http2Headers()
        if not self.headers_sent:
            trailers.status = HttpStatus.OK.value
            trailers.add(
                TripleHeaderName.CONTENT_TYPE.value,
                TripleHeaderValue.APPLICATION_GRPC_PROTO.value,
            )

        # add attachments
        [trailers.add(k, v) for k, v in attachments.items()]

        # add status
        trailers.add(TripleHeaderName.GRPC_STATUS.value, status.code.value)
        if status.code is not GRpcCode.OK:
            trailers.add(
                TripleHeaderName.GRPC_MESSAGE.value,
                TriRpcStatus.limit_desc(status.description),
            )

        # send trailers
        self._headers_sent = True
        self._trailers_sent = True
        self._stream.send_headers(trailers, end_stream=True)