in src/dubbo/protocol/triple/coders.py [0:0]
def encode(self, message: bytes, compress_flag: int) -> bytes:
"""
Encode the message into the gRPC message format.
:param message: The message to encode.
:type message: bytes
:param compress_flag: The compress flag. 0 for no compression, 1 for compression.
:type compress_flag: int
:return: The encoded message.
:rtype: bytes
"""
# check compress_flag
if compress_flag not in [0, 1]:
raise RpcError(f"compress_flag must be 0 or 1, but got {compress_flag}")
# check message size
if len(message) > DEFAULT_MAX_MESSAGE_SIZE:
raise RpcError(f"Message too large. Allowed maximum size is 4194304 bytes, but got {len(message)} bytes.")
# check compress_flag and compress the payload
if compress_flag == 1:
if not self._compressor:
raise RpcError("compression is required when compress_flag is 1")
message = self._compressor.compress(message)
# Create the gRPC header
# >: big-endian
# B: unsigned char(1 byte) -> compressed_flag
# I: unsigned int(4 bytes) -> message_length
header = struct.pack(">BI", compress_flag, len(message))
return header + message