private boolean onControlBodyAdded()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/FrameDecoder.java [174:223]


  private boolean onControlBodyAdded(final ByteBuf in, final List out)
    throws InvalidProtocolBufferException {
    ContextManager.getEncoderDecoderLock().lock();
    in.markReaderIndex();
    try {
      assert (controlBodyBytesToRead > 0);
      assert (dataBodyBytesToRead == 0);
      assert (inputContext == null);

      assert (controlBodyBytesToRead <= Integer.MAX_VALUE);

      int i = 0;
      while (in.readableBytes() < controlBodyBytesToRead) {
        // cannot read body now
        LOG.warn("ControlMessage cannot be read ({})", i);
        ContextManager.getEncoderDecoderLock().unlock();
        Thread.sleep(200);
        ContextManager.getEncoderDecoderLock().lock();
        i++;
        if (i > 10) {
          in.resetReaderIndex();
          return false;
        }
      }

      final byte[] bytes;
      final int offset;
      if (in.hasArray()) {
        bytes = in.array();
        offset = in.arrayOffset() + in.readerIndex();
      } else {
        bytes = new byte[(int) controlBodyBytesToRead];
        in.getBytes(in.readerIndex(), bytes, 0, (int) controlBodyBytesToRead);
        offset = 0;
      }
      final ByteTransferContextSetupMessage controlMessage
        = ByteTransferContextSetupMessage.PARSER.parseFrom(bytes, offset, (int) controlBodyBytesToRead);
      LOG.debug("ControlMessage decoded: {}", controlMessage);

      out.add(controlMessage);
      in.skipBytes((int) controlBodyBytesToRead);
      controlBodyBytesToRead = 0;
      return true;
    } catch (InterruptedException e) {
      e.printStackTrace();
      return false;
    } finally {
      ContextManager.getEncoderDecoderLock().unlock();
    }
  }