private boolean onFrameStarted()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/FrameDecoder.java [125:164]


  private boolean onFrameStarted(final ChannelHandlerContext ctx, final ByteBuf in) {
    assert (controlBodyBytesToRead == 0);
    assert (dataBodyBytesToRead == 0);
    assert (inputContext == null);

    if (in.readableBytes() < HEADER_LENGTH) {
      // cannot read a frame header frame now
      return false;
    }
    final byte flags = in.readByte();
    final int transferIndex = in.readInt();
    final long length = in.readUnsignedInt();
    if (length < 0) {
      throw new IllegalStateException(String.format("Frame length is negative: %d", length));
    }
    if ((flags & ((byte) (1 << 3))) == 0) {
      // setup context for reading control frame body
      controlBodyBytesToRead = length;
    } else {
      // setup context for reading data frame body
      dataBodyBytesToRead = length;
      final ByteTransferDataDirection dataDirection = (flags & ((byte) (1 << 2))) == 0
        ? ByteTransferDataDirection.INITIATOR_SENDS_DATA : ByteTransferDataDirection.INITIATOR_RECEIVES_DATA;
      final boolean newSubStreamFlag = (flags & ((byte) (1 << 1))) != 0;
      isLastFrame = (flags & ((byte) (1 << 0))) != 0;
      inputContext = contextManager.getInputContext(dataDirection, transferIndex);
      if (inputContext == null) {
        throw new IllegalStateException(String.format("Transport context for %s:%d was not found between the local"
            + "address %s and the remote address %s", dataDirection, transferIndex,
          ctx.channel().localAddress(), ctx.channel().remoteAddress()));
      }
      if (newSubStreamFlag) {
        inputContext.onNewStream();
      }
      if (dataBodyBytesToRead == 0) {
        onDataFrameEnd();
      }
    }
    return true;
  }