in src/dubbo/protocol/triple/streams.py [0:0]
def done_writing(self, **kwargs) -> None:
"""
Done writing to the stream.
:param kwargs: The keyword arguments to pass to the done
:raises RpcError: If done writing multiple times.
"""
if self._write_done:
raise RpcError("Done writing multiple times")
# try to get TriRpcStatus from kwargs
status = kwargs.get("tri_rpc_status")
if status is None:
status = TriRpcStatus(GRpcCode.OK)
elif not isinstance(status, TriRpcStatus):
raise RpcError("Invalid status type")
# remove the status from kwargs
kwargs.pop("tri-rpc-status", None)
self._call.complete(status, kwargs)
self._write_done = True