private void decodeImpl()

in src/main/java/com/uber/rss/decoders/StreamServerMessageDecoder.java [131:304]


  private void decodeImpl(ChannelHandlerContext ctx,
                        ByteBuf in,
                        List<Object> out) {
    metrics.getNumIncomingRequests().inc(1);

    if (in.readableBytes() == 0) {
      return;
    }

    switch (state) {
      case READ_MAGIC_BYTE_AND_VERSION:
        if (in.readableBytes() < 2 * Byte.BYTES) {
          return;
        }
        byte magicByte = in.readByte();
        byte version;
        switch (magicByte) {
          case MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE:
            version = in.readByte();
            if (version != MessageConstants.UPLOAD_UPLINK_VERSION_3) {
              String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
              logger.warn(
                  "Invalid notify version {} from client {}",
                  version, clientInfo);
              ctx.close();
              logger.debug("Closed connection to client {}", clientInfo);
              return;
            }
            state = State.READ_MESSAGE_TYPE;
            return;
          case MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE:
            version = in.readByte();
            if (version != MessageConstants.DOWNLOAD_UPLINK_VERSION_3) {
              String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
              logger.warn(
                  "Invalid download version {} from client {}",
                  version, clientInfo);
              ctx.close();
              logger.debug("Closed connection to client {}", clientInfo);
              return;
            }
            state = State.READ_MESSAGE_TYPE;
            return;
          case MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE:
            version = in.readByte();
            if (version != MessageConstants.NOTIFY_UPLINK_VERSION_3) {
              String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
              logger.warn(
                  "Invalid control version {} from client {}",
                  version, clientInfo);
              ctx.close();
              logger.debug("Closed connection to client {}", clientInfo);
              return;
            }
            state = State.READ_MESSAGE_TYPE;
            return;
          case MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE:
            version = in.readByte();
            if (version != MessageConstants.REGISTRY_UPLINK_VERSION_3) {
              String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
              logger.warn(
                  "Invalid registry version {} from client {}",
                  version, clientInfo);
              ctx.close();
              logger.debug("Closed connection to client {}", clientInfo);
              return;
            }
            state = State.READ_MESSAGE_TYPE;
            return;
          default:
            String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
            logger.warn(
                "Invalid magic byte {} from client {}",
                magicByte, clientInfo);
            ctx.close();
            logger.debug("Closed connection to client {}", clientInfo);
            return;
        }
      case READ_MESSAGE_TYPE:
        if (in.readableBytes() < Integer.BYTES) {
          return;
        }
        int messageType = in.readInt();
        if (messageType < 0) {
          controlMessageType = messageType;
          state = State.READ_CONTROL_MESSAGE_LEN;
        } else {
          partitionId = messageType;
          state = State.READ_TASK_ATTEMPT_ID;
        }
        return;
      case READ_CONTROL_MESSAGE_LEN:
        if (in.readableBytes() < Integer.BYTES) {
          return;
        }
        requiredBytes = in.readInt();
        if (requiredBytes < 0) {
          throw new RssInvalidDataException(String.format(
              "Invalid control message length: %s, %s",
              requiredBytes, NettyUtils.getServerConnectionInfo(ctx)));
        }
        if (requiredBytes == 0) {
          Object controlMessage = getControlMessage(ctx, controlMessageType, in);
          out.add(controlMessage);
          resetData();
          state = State.READ_MESSAGE_TYPE;
        } else {
          state = State.READ_CONTROL_MESSAGE_BYTES;
        }
        return;
      case READ_CONTROL_MESSAGE_BYTES:
        if (in.readableBytes() < requiredBytes) {
          return;
        }
        Object controlMessage = getControlMessage(ctx, controlMessageType, in);
        out.add(controlMessage);
        resetData();
        state = State.READ_MESSAGE_TYPE;
        return;
      case READ_TASK_ATTEMPT_ID:
        if (in.readableBytes() < Long.BYTES) {
          return;
        }
        in.readBytes(taskAttemptIdBytes);
        taskAttemptId = ByteBufUtils.readLong(taskAttemptIdBytes, 0);
        if (taskAttemptId < 0) {
          throw new RssInvalidDataException(String.format(
              "Invalid task attempt id: %s, %s",
              taskAttemptId, NettyUtils.getServerConnectionInfo(ctx)));
        }
        state = State.READ_DATA_MESSAGE_LEN;
        return;
      case READ_DATA_MESSAGE_LEN:
        if (in.readableBytes() < Integer.BYTES) {
          return;
        }
        int dataLen = in.readInt();
        if (dataLen < 0) {
          throw new RssInvalidDataException(String.format(
              "Invalid data length: %s, %s",
              dataLen, NettyUtils.getServerConnectionInfo(ctx)));
        }
        if (dataLen == 0) {
          out.add(createShuffleDataWrapper(in, 0));
          resetData();
          requiredBytes = 0;
          state = State.READ_MESSAGE_TYPE;
        } else {
          requiredBytes = dataLen;
          state = State.READ_DATA_MESSAGE_BYTES;
          shuffleDataBuffer.clear();
        }
        return;
      case READ_DATA_MESSAGE_BYTES:
        if (in.readableBytes() < requiredBytes) {
          int count = in.readableBytes();
          shuffleDataBuffer.ensureWritable(count);
          in.readBytes(shuffleDataBuffer, count);
          requiredBytes -= count;
        } else {
          shuffleDataBuffer.ensureWritable(requiredBytes);
          in.readBytes(shuffleDataBuffer, requiredBytes);
          out.add(createShuffleDataWrapper(shuffleDataBuffer, shuffleDataBuffer.readableBytes()));
          requiredBytes = 0;
          resetData();
          state = State.READ_MESSAGE_TYPE;
        }
        return;
      default:
        throw new RssException(String.format(
            "Should not get incoming data in state %s, client %s",
            state, NettyUtils.getServerConnectionInfo(ctx)));
    }
  }