in src/dubbo/protocol/triple/call/client_call.py [0:0]
def cancel_by_local(self, e: Exception) -> None:
if self._done:
return
self._done = True
if not self._client_stream or not self._headers_sent:
return
status = TriRpcStatus(
code=GRpcCode.CANCELLED,
description=f"Call cancelled by client: {e}",
)
self._client_stream.cancel_by_local(status)