in src/dubbo/protocol/triple/call/client_call.py [0:0]
def send_message(self, message: Any, last: bool) -> None:
if self._done:
_LOGGER.warning("Call is done, cannot send message")
return
# check if headers are sent
if not self._headers_sent:
# send headers
self._headers_sent = True
self._client_stream.send_headers(self._request_metadata.to_headers())
# send message
try:
data = FunctionHelper.call_func(self._serializer.serialize, message) if message else b""
compress_flag = 0 if self._compressor.get_message_encoding() == Identity.get_message_encoding() else 1
self._client_stream.send_message(data, compress_flag, last)
except SerializationError as e:
_LOGGER.error("Failed to serialize message: %s", e)
# close the stream
self.cancel_by_local(e)
# close the listener
status = TriRpcStatus(
code=GRpcCode.INTERNAL,
description="Failed to serialize message",
)
self._listener.on_close(status, {})