in dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java [92:229]
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(
channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in, eventPayload);
}
} else {
DecodeableRpcResult result;
Invocation inv = (Invocation) getRequestData(channel, res, id);
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
if (customByteAccessor != null) {
result = customByteAccessor.getRpcResult(
channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
} else {
result = new DecodeableRpcResult(
channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
}
result.decode();
} else {
if (customByteAccessor != null) {
result = customByteAccessor.getRpcResult(
channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
} else {
result = new DecodeableRpcResult(
channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), inv, proto);
}
}
data = result;
}
res.setResult(data);
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
Request req;
try {
Object data;
if ((flag & FLAG_EVENT) != 0) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
req = new HeartBeatRequest(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
((HeartBeatRequest) req).setProto(proto);
data = null;
} else {
req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
ObjectInput in = CodecSupport.deserialize(
channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in, eventPayload);
}
req.setEvent(true);
} else {
req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// get data length.
int len = Bytes.bytes2int(header, 12);
req.setPayload(len);
DecodeableRpcInvocation inv;
if (isDecodeDataInIoThread(channel)) {
if (customByteAccessor != null) {
inv = customByteAccessor.getRpcInvocation(
channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
} else {
inv = new DecodeableRpcInvocation(
frameworkModel,
channel,
req,
new UnsafeByteArrayInputStream(readMessageData(is)),
proto);
}
inv.decode();
} else {
if (customByteAccessor != null) {
inv = customByteAccessor.getRpcInvocation(
channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
} else {
inv = new DecodeableRpcInvocation(
frameworkModel,
channel,
req,
new UnsafeByteArrayInputStream(readMessageData(is)),
proto);
}
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode request failed: " + t.getMessage(), t);
}
// bad request
req = new HeartBeatRequest(id);
req.setBroken(true);
req.setData(t);
}
return req;
}
}