def done_writing()

in src/dubbo/protocol/triple/streams.py [0:0]


    def done_writing(self, **kwargs) -> None:
        """
        Done writing to the stream.
        :param kwargs: The keyword arguments to pass to the done
        :raises RpcError: If done writing multiple times.
        """
        if self._write_done:
            raise RpcError("Done writing multiple times")

        # try to get TriRpcStatus from kwargs
        status = kwargs.get("tri_rpc_status")
        if status is None:
            status = TriRpcStatus(GRpcCode.OK)
        elif not isinstance(status, TriRpcStatus):
            raise RpcError("Invalid status type")

        # remove the status from kwargs
        kwargs.pop("tri-rpc-status", None)

        self._call.complete(status, kwargs)
        self._write_done = True