in storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java [43:169]
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
// Make sure that we have received at least a short
long available = buf.readableBytes();
if (available < 2) {
//need more data
return;
}
List<Object> ret = new ArrayList<>();
// Use while loop, try to decode as more messages as possible in single call
while (available >= 2) {
// Mark the current buffer position before reading task/len field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex();
// read the short field
short code = buf.readShort();
available -= 2;
// case 1: Control message
ControlMessage controlMessage = ControlMessage.mkMessage(code);
if (controlMessage != null) {
if (controlMessage == ControlMessage.EOB_MESSAGE) {
continue;
} else {
out.add(controlMessage);
return;
}
}
//case 2: SaslTokenMessageRequest
if (code == SaslMessageToken.IDENTIFIER) {
// Make sure that we have received at least an integer (length)
if (buf.readableBytes() < 4) {
//need more data
buf.resetReaderIndex();
return;
}
// Read the length field.
int length = buf.readInt();
if (length <= 0) {
out.add(new SaslMessageToken(null));
return;
}
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length) {
// The whole bytes were not received yet - return null.
buf.resetReaderIndex();
return;
}
// There's enough bytes in the buffer. Read it.
byte[] bytes = new byte[length];
buf.readBytes(bytes);
// Successfully decoded a frame.
// Return a SaslTokenMessageRequest object
out.add(new SaslMessageToken(bytes));
return;
}
// case 3: BackPressureStatus
if (code == BackPressureStatus.IDENTIFIER) {
available = buf.readableBytes();
if (available < 4) {
//Need more data
buf.resetReaderIndex();
return;
}
int dataLen = buf.readInt();
if (available < 4 + dataLen) {
// need more data
buf.resetReaderIndex();
return;
}
byte[] bytes = new byte[dataLen];
buf.readBytes(bytes);
out.add(BackPressureStatus.read(bytes, deser));
return;
}
// case 4: task Message
// Make sure that we have received at least an integer (length)
if (available < 4) {
// need more data
buf.resetReaderIndex();
break;
}
// Read the length field.
int length = buf.readInt();
available -= 4;
if (length <= 0) {
ret.add(new TaskMessage(code, null));
break;
}
// Make sure if there's enough bytes in the buffer.
if (available < length) {
// The whole bytes were not received yet - return null.
buf.resetReaderIndex();
break;
}
available -= length;
// There's enough bytes in the buffer. Read it.
byte[] bytes = new byte[length];
buf.readBytes(bytes);
// Successfully decoded a frame.
// Return a TaskMessage object
ret.add(new TaskMessage(code, bytes));
}
if (!ret.isEmpty()) {
out.add(ret);
}
}