in src/main/java/com/uber/rss/tools/PartitionFileChecker.java [41:111]
public void run() {
ByteBuf dataBlockStreamData = Unpooled.buffer(1000);
ByteBuf dataBlockStreamUncompressedData = dataBlockStreamData;
// Read data block stream from file
try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
InputStream inputStream = fileInputStream;
if (fileCompressCodec.equals(Compression.COMPRESSION_CODEC_LZ4)) {
inputStream = new LZ4BlockInputStream(fileInputStream);
} else if (fileCompressCodec.equals(Compression.COMPRESSION_CODEC_ZSTD)) {
inputStream = new ZstdInputStream(fileInputStream);
}
while (true) {
byte[] bytes = StreamUtils.readBytes(inputStream, Long.BYTES);
if (bytes == null) {
break;
}
long taskAttemptId = ByteBufUtils.readLong(bytes, 0);
bytes = StreamUtils.readBytes(inputStream, Integer.BYTES);
int dataBlockLength = ByteBufUtils.readInt(bytes, 0);
byte[] dataBlockBytes = StreamUtils.readBytes(inputStream, dataBlockLength);
dataBlockStreamData.writeBytes(dataBlockBytes);
System.out.println(String.format("Got data block from task attempt %s, %s bytes", taskAttemptId, dataBlockLength));
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
if (blockCompressCodec.equals(Compression.COMPRESSION_CODEC_LZ4)) {
dataBlockStreamUncompressedData = Unpooled.buffer(1000);
LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
while (dataBlockStreamData.readableBytes() > 0) {
int compressedLen = dataBlockStreamData.readInt();
int uncompressedLen = dataBlockStreamData.readInt();
byte[] compressedBytes = new byte[compressedLen];
byte[] uncompressedBytes = new byte[uncompressedLen];
dataBlockStreamData.readBytes(compressedBytes);
decompressor.decompress(compressedBytes, uncompressedBytes);
dataBlockStreamUncompressedData.writeBytes(uncompressedBytes);
}
} else if (blockCompressCodec.equals(Compression.COMPRESSION_CODEC_ZSTD)) {
dataBlockStreamUncompressedData = Unpooled.buffer(1000);
while (dataBlockStreamData.readableBytes() > 0) {
int compressedLen = dataBlockStreamData.readInt();
int uncompressedLen = dataBlockStreamData.readInt();
byte[] compressedBytes = new byte[compressedLen];
byte[] uncompressedBytes = new byte[uncompressedLen];
dataBlockStreamData.readBytes(compressedBytes);
long decompressResult = Zstd.decompress(compressedBytes, uncompressedBytes);
if (Zstd.isError(decompressResult)) {
throw new RssInvalidDataException("Failed to decompress zstd data, returned value: " + decompressResult);
}
dataBlockStreamUncompressedData.writeBytes(uncompressedBytes);
}
}
while (dataBlockStreamUncompressedData.readableBytes() > 0) {
int keyLen = dataBlockStreamUncompressedData.readInt();
if (keyLen > 0) {
byte[] keyBytes = new byte[keyLen];
dataBlockStreamUncompressedData.readBytes(keyBytes);
}
int valueLen = dataBlockStreamUncompressedData.readInt();
if (valueLen > 0) {
byte[] valueBytes = new byte[valueLen];
dataBlockStreamUncompressedData.readBytes(valueBytes);
}
}
}