protected void decode()

in storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java [43:169]


    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        // Make sure that we have received at least a short 
        long available = buf.readableBytes();
        if (available < 2) {
            //need more data
            return;
        }

        List<Object> ret = new ArrayList<>();

        // Use while loop, try to decode as more messages as possible in single call
        while (available >= 2) {

            // Mark the current buffer position before reading task/len field
            // because the whole frame might not be in the buffer yet.
            // We will reset the buffer position to the marked position if
            // there's not enough bytes in the buffer.
            buf.markReaderIndex();

            // read the short field
            short code = buf.readShort();
            available -= 2;

            // case 1: Control message
            ControlMessage controlMessage = ControlMessage.mkMessage(code);
            if (controlMessage != null) {

                if (controlMessage == ControlMessage.EOB_MESSAGE) {
                    continue;
                } else {
                    out.add(controlMessage);
                    return;
                }
            }

            //case 2: SaslTokenMessageRequest
            if (code == SaslMessageToken.IDENTIFIER) {
                // Make sure that we have received at least an integer (length)
                if (buf.readableBytes() < 4) {
                    //need more data
                    buf.resetReaderIndex();
                    return;
                }

                // Read the length field.
                int length = buf.readInt();
                if (length <= 0) {
                    out.add(new SaslMessageToken(null));
                    return;
                }

                // Make sure if there's enough bytes in the buffer.
                if (buf.readableBytes() < length) {
                    // The whole bytes were not received yet - return null.
                    buf.resetReaderIndex();
                    return;
                }

                // There's enough bytes in the buffer. Read it.  
                byte[] bytes = new byte[length];
                buf.readBytes(bytes);
                // Successfully decoded a frame.
                // Return a SaslTokenMessageRequest object
                out.add(new SaslMessageToken(bytes));
                return;
            }

            // case 3: BackPressureStatus
            if (code == BackPressureStatus.IDENTIFIER) {
                available = buf.readableBytes();
                if (available < 4) {
                    //Need  more data
                    buf.resetReaderIndex();
                    return;
                }
                int dataLen = buf.readInt();
                if (available < 4 + dataLen) {
                    // need more data
                    buf.resetReaderIndex();
                    return;
                }
                byte[] bytes = new byte[dataLen];
                buf.readBytes(bytes);
                out.add(BackPressureStatus.read(bytes, deser));
                return;
            }

            // case 4: task Message

            // Make sure that we have received at least an integer (length)
            if (available < 4) {
                // need more data
                buf.resetReaderIndex();
                break;
            }

            // Read the length field.
            int length = buf.readInt();

            available -= 4;

            if (length <= 0) {
                ret.add(new TaskMessage(code, null));
                break;
            }

            // Make sure if there's enough bytes in the buffer.
            if (available < length) {
                // The whole bytes were not received yet - return null.
                buf.resetReaderIndex();
                break;
            }
            available -= length;

            // There's enough bytes in the buffer. Read it.
            byte[] bytes = new byte[length];
            buf.readBytes(bytes);

            // Successfully decoded a frame.
            // Return a TaskMessage object
            ret.add(new TaskMessage(code, bytes));
        }

        if (!ret.isEmpty()) {
            out.add(ret);
        }
    }