in core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java [76:143]
public RpcMessage decodeFrame(ByteBuf in) {
ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0();
if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) {
throw new IllegalArgumentException("Nothing to decode.");
}
in.markReaderIndex();
short protocol = in.readShort();
int flag = (int) in.readShort();
boolean isHeartbeat = (ProtocolConstantsV0.FLAG_HEARTBEAT & flag) > 0;
boolean isRequest = (ProtocolConstantsV0.FLAG_REQUEST & flag) > 0;
boolean isSeataCodec = (ProtocolConstantsV0.FLAG_SEATA_CODEC & flag) > 0;
rpcMessage.setSeataCodec(isSeataCodec);
short bodyLength = 0;
short typeCode = 0;
if (!isSeataCodec) {
bodyLength = in.readShort();
} else {
typeCode = in.readShort();
}
long msgId = in.readLong();
rpcMessage.setId(msgId);
if (isHeartbeat) {
rpcMessage.setAsync(true);
rpcMessage.setHeartbeat(isHeartbeat);
rpcMessage.setRequest(isRequest);
if (isRequest) {
rpcMessage.setBody(HeartbeatMessage.PING);
} else {
rpcMessage.setBody(HeartbeatMessage.PONG);
}
return rpcMessage.protocolMsg2RpcMsg();
}
if (bodyLength > 0 && in.readableBytes() < bodyLength) {
in.resetReaderIndex();
throw new IllegalArgumentException("readableBytes < bodyLength");
}
rpcMessage.setAsync((ProtocolConstantsV0.FLAG_ASYNC & flag) > 0);
rpcMessage.setHeartbeat(false);
rpcMessage.setRequest(isRequest);
try {
int length = in.readableBytes();
byte[] bs = new byte[length];
in.readBytes(bs);
// fill messageType in v0
byte[] bs2 = new byte[2 + length];
bs2[0] = (byte) (0x00FF & (typeCode >> 8));
bs2[1] = (byte) (0x00FF & typeCode);
System.arraycopy(bs, 0, bs2, 2, length);
byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), ProtocolConstants.VERSION_0);
rpcMessage.setBody(serializer.deserialize(bs2));
} catch (Exception e) {
LOGGER.error("decode error", e);
throw e;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + msgId);
}
return rpcMessage.protocolMsg2RpcMsg();
}