private BaseMessage readDataItem()

in src/main/java/com/uber/rss/execution/LocalFileStateStoreIterator.java [124:174]


  private BaseMessage readDataItem() {
    // read message type
    byte[] bytes = readBytes(Integer.BYTES);
    if (bytes == null) {
      closeCurrentFileStream();
      return null;
    }
    int messageType = ByteBufUtils.readInt(bytes, 0);
    // read length
    bytes = readBytes(Integer.BYTES);
    if (bytes == null) {
      logger.warn(String.format("Failed to read length field in state file %s", currentFile));
      closeCurrentFileStream();
      return null;
    }
    int length = ByteBufUtils.readInt(bytes, 0);
    if (length < 0) {
      logger.warn(String.format("Hit invalid length field %s in state file %s", length, currentFile));
      closeCurrentFileStream();
      return null;
    }
    // read bytes after length
    bytes = readBytes(length);
    if (bytes == null) {
      logger.warn(String.format("Failed to read payload field in state file %s", currentFile));
      closeCurrentFileStream();
      return null;
    }

    try {
      ByteBuf buf = Unpooled.wrappedBuffer(bytes);
      switch (messageType) {
        case MessageConstants.MESSAGE_StageInfoStateItem:
          return StageInfoStateItem.deserialize(buf);
        case MessageConstants.MESSAGE_TaskAttemptCommitStateItem:
          return TaskAttemptCommitStateItem.deserialize(buf);
        case MessageConstants.MESSAGE_AppDeletionStateItem:
          return AppDeletionStateItem.deserialize(buf);
        case MessageConstants.MESSAGE_StageCorruptionStateItem:
          return StageCorruptionStateItem.deserialize(buf);
        default:
          logger.warn(String.format("Hit unsupported message type %s in state file %s", messageType, currentFile));
          closeCurrentFileStream();
          return null;
      }
    } catch (Throwable ex) {
      logger.warn(String.format("Failed to deserialize message type %s from state file: %s", messageType, currentFile), ex);
      closeCurrentFileStream();
      return null;
    }
  }