def cancel_by_local()

in src/dubbo/protocol/triple/call/client_call.py [0:0]


    def cancel_by_local(self, e: Exception) -> None:
        if self._done:
            return
        self._done = True

        if not self._client_stream or not self._headers_sent:
            return

        status = TriRpcStatus(
            code=GRpcCode.CANCELLED,
            description=f"Call cancelled by client: {e}",
        )
        self._client_stream.cancel_by_local(status)