private ColumnPageReader readAllPages()

in common/src/main/java/org/apache/comet/parquet/FileReader.java [859:1019]


    private ColumnPageReader readAllPages(
        BlockCipher.Decryptor headerBlockDecryptor,
        BlockCipher.Decryptor pageBlockDecryptor,
        byte[] aadPrefix,
        int rowGroupOrdinal,
        int columnOrdinal)
        throws IOException {
      List<DataPage> pagesInChunk = new ArrayList<>();
      DictionaryPage dictionaryPage = null;
      PrimitiveType type =
          fileMetaData.getSchema().getType(descriptor.col.getPath()).asPrimitiveType();

      long valuesCountReadSoFar = 0;
      int dataPageCountReadSoFar = 0;
      byte[] dataPageHeaderAAD = null;
      if (null != headerBlockDecryptor) {
        dataPageHeaderAAD =
            AesCipher.createModuleAAD(
                aadPrefix,
                ModuleCipherFactory.ModuleType.DataPageHeader,
                rowGroupOrdinal,
                columnOrdinal,
                getPageOrdinal(dataPageCountReadSoFar));
      }
      while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
        byte[] pageHeaderAAD = dataPageHeaderAAD;
        if (null != headerBlockDecryptor) {
          // Important: this verifies file integrity (makes sure dictionary page had not been
          // removed)
          if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
            pageHeaderAAD =
                AesCipher.createModuleAAD(
                    aadPrefix,
                    ModuleCipherFactory.ModuleType.DictionaryPageHeader,
                    rowGroupOrdinal,
                    columnOrdinal,
                    -1);
          } else {
            int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
          }
        }

        PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
        int compressedPageSize = pageHeader.getCompressed_page_size();
        final BytesInput pageBytes;
        switch (pageHeader.type) {
          case DICTIONARY_PAGE:
            // there is only one dictionary page per column chunk
            if (dictionaryPage != null) {
              throw new ParquetDecodingException(
                  "more than one dictionary page in column " + descriptor.col);
            }
            pageBytes = this.readAsBytesInput(compressedPageSize);
            if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
              verifyCrc(
                  pageHeader.getCrc(),
                  pageBytes.toByteArray(),
                  "could not verify dictionary page integrity, CRC checksum verification failed");
            }
            DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
            dictionaryPage =
                new DictionaryPage(
                    pageBytes,
                    uncompressedPageSize,
                    dicHeader.getNum_values(),
                    converter.getEncoding(dicHeader.getEncoding()));
            // Copy crc to new page, used for testing
            if (pageHeader.isSetCrc()) {
              dictionaryPage.setCrc(pageHeader.getCrc());
            }
            break;

          case DATA_PAGE:
            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
            pageBytes = this.readAsBytesInput(compressedPageSize);
            if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
              verifyCrc(
                  pageHeader.getCrc(),
                  pageBytes.toByteArray(),
                  "could not verify page integrity, CRC checksum verification failed");
            }
            DataPageV1 dataPageV1 =
                new DataPageV1(
                    pageBytes,
                    dataHeaderV1.getNum_values(),
                    uncompressedPageSize,
                    converter.fromParquetStatistics(
                        getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type),
                    converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
                    converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
                    converter.getEncoding(dataHeaderV1.getEncoding()));
            // Copy crc to new page, used for testing
            if (pageHeader.isSetCrc()) {
              dataPageV1.setCrc(pageHeader.getCrc());
            }
            pagesInChunk.add(dataPageV1);
            valuesCountReadSoFar += dataHeaderV1.getNum_values();
            ++dataPageCountReadSoFar;
            break;

          case DATA_PAGE_V2:
            DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
            int dataSize =
                compressedPageSize
                    - dataHeaderV2.getRepetition_levels_byte_length()
                    - dataHeaderV2.getDefinition_levels_byte_length();
            pagesInChunk.add(
                new DataPageV2(
                    dataHeaderV2.getNum_rows(),
                    dataHeaderV2.getNum_nulls(),
                    dataHeaderV2.getNum_values(),
                    this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
                    this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
                    converter.getEncoding(dataHeaderV2.getEncoding()),
                    this.readAsBytesInput(dataSize),
                    uncompressedPageSize,
                    converter.fromParquetStatistics(
                        getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type),
                    dataHeaderV2.isIs_compressed()));
            valuesCountReadSoFar += dataHeaderV2.getNum_values();
            ++dataPageCountReadSoFar;
            break;

          default:
            LOG.debug(
                "skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
            stream.skipFully(compressedPageSize);
            break;
        }
      }
      if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
        // Would be nice to have a CorruptParquetFileException or something as a subclass?
        throw new IOException(
            "Expected "
                + descriptor.metadata.getValueCount()
                + " values in column chunk at "
                + file
                + " offset "
                + descriptor.metadata.getFirstDataPageOffset()
                + " but got "
                + valuesCountReadSoFar
                + " values instead over "
                + pagesInChunk.size()
                + " pages ending at file offset "
                + (descriptor.fileOffset + stream.position()));
      }
      CompressionCodecFactory.BytesInputDecompressor decompressor =
          options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
      return new ColumnPageReader(
          decompressor,
          pagesInChunk,
          dictionaryPage,
          offsetIndex,
          blocks.get(currentBlock).getRowCount(),
          pageBlockDecryptor,
          aadPrefix,
          rowGroupOrdinal,
          columnOrdinal);
    }