in src/main/java/com/uber/rss/execution/LocalFileStateStoreIterator.java [124:174]
private BaseMessage readDataItem() {
// read message type
byte[] bytes = readBytes(Integer.BYTES);
if (bytes == null) {
closeCurrentFileStream();
return null;
}
int messageType = ByteBufUtils.readInt(bytes, 0);
// read length
bytes = readBytes(Integer.BYTES);
if (bytes == null) {
logger.warn(String.format("Failed to read length field in state file %s", currentFile));
closeCurrentFileStream();
return null;
}
int length = ByteBufUtils.readInt(bytes, 0);
if (length < 0) {
logger.warn(String.format("Hit invalid length field %s in state file %s", length, currentFile));
closeCurrentFileStream();
return null;
}
// read bytes after length
bytes = readBytes(length);
if (bytes == null) {
logger.warn(String.format("Failed to read payload field in state file %s", currentFile));
closeCurrentFileStream();
return null;
}
try {
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
switch (messageType) {
case MessageConstants.MESSAGE_StageInfoStateItem:
return StageInfoStateItem.deserialize(buf);
case MessageConstants.MESSAGE_TaskAttemptCommitStateItem:
return TaskAttemptCommitStateItem.deserialize(buf);
case MessageConstants.MESSAGE_AppDeletionStateItem:
return AppDeletionStateItem.deserialize(buf);
case MessageConstants.MESSAGE_StageCorruptionStateItem:
return StageCorruptionStateItem.deserialize(buf);
default:
logger.warn(String.format("Hit unsupported message type %s in state file %s", messageType, currentFile));
closeCurrentFileStream();
return null;
}
} catch (Throwable ex) {
logger.warn(String.format("Failed to deserialize message type %s from state file: %s", messageType, currentFile), ex);
closeCurrentFileStream();
return null;
}
}