public void run()

in src/main/java/com/uber/rss/tools/PartitionFileChecker.java [41:111]


  public void run() {
    ByteBuf dataBlockStreamData = Unpooled.buffer(1000);
    ByteBuf dataBlockStreamUncompressedData = dataBlockStreamData;

    // Read data block stream from file
    try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
      InputStream inputStream = fileInputStream;
      if (fileCompressCodec.equals(Compression.COMPRESSION_CODEC_LZ4)) {
        inputStream = new LZ4BlockInputStream(fileInputStream);
      } else if (fileCompressCodec.equals(Compression.COMPRESSION_CODEC_ZSTD)) {
        inputStream = new ZstdInputStream(fileInputStream);
      }
      while (true) {
        byte[] bytes = StreamUtils.readBytes(inputStream, Long.BYTES);
        if (bytes == null) {
          break;
        }
        long taskAttemptId = ByteBufUtils.readLong(bytes, 0);
        bytes = StreamUtils.readBytes(inputStream, Integer.BYTES);
        int dataBlockLength = ByteBufUtils.readInt(bytes, 0);
        byte[] dataBlockBytes = StreamUtils.readBytes(inputStream, dataBlockLength);
        dataBlockStreamData.writeBytes(dataBlockBytes);
        System.out.println(String.format("Got data block from task attempt %s, %s bytes", taskAttemptId, dataBlockLength));
      }
    } catch (Throwable e) {
      throw new RuntimeException(e);
    }

    if (blockCompressCodec.equals(Compression.COMPRESSION_CODEC_LZ4)) {
      dataBlockStreamUncompressedData = Unpooled.buffer(1000);

      LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();

      while (dataBlockStreamData.readableBytes() > 0) {
        int compressedLen = dataBlockStreamData.readInt();
        int uncompressedLen = dataBlockStreamData.readInt();
        byte[] compressedBytes = new byte[compressedLen];
        byte[] uncompressedBytes = new byte[uncompressedLen];
        dataBlockStreamData.readBytes(compressedBytes);
        decompressor.decompress(compressedBytes, uncompressedBytes);
        dataBlockStreamUncompressedData.writeBytes(uncompressedBytes);
      }
    } else if (blockCompressCodec.equals(Compression.COMPRESSION_CODEC_ZSTD)) {
      dataBlockStreamUncompressedData = Unpooled.buffer(1000);
      while (dataBlockStreamData.readableBytes() > 0) {
        int compressedLen = dataBlockStreamData.readInt();
        int uncompressedLen = dataBlockStreamData.readInt();
        byte[] compressedBytes = new byte[compressedLen];
        byte[] uncompressedBytes = new byte[uncompressedLen];
        dataBlockStreamData.readBytes(compressedBytes);
        long decompressResult = Zstd.decompress(compressedBytes, uncompressedBytes);
        if (Zstd.isError(decompressResult)) {
          throw new RssInvalidDataException("Failed to decompress zstd data, returned value: " + decompressResult);
        }
        dataBlockStreamUncompressedData.writeBytes(uncompressedBytes);
      }
    }

    while (dataBlockStreamUncompressedData.readableBytes() > 0) {
      int keyLen = dataBlockStreamUncompressedData.readInt();
      if (keyLen > 0) {
        byte[] keyBytes = new byte[keyLen];
        dataBlockStreamUncompressedData.readBytes(keyBytes);
      }
      int valueLen = dataBlockStreamUncompressedData.readInt();
      if (valueLen > 0) {
        byte[] valueBytes = new byte[valueLen];
        dataBlockStreamUncompressedData.readBytes(valueBytes);
      }
    }
  }