private void processChunk()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [578:788]


  private void processChunk(
      TransParquetFileReader reader,
      long blockRowCount,
      ColumnChunkMetaData chunk,
      CompressionCodecName newCodecName,
      ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
      boolean encryptColumn,
      BloomFilter bloomFilter,
      ColumnIndex columnIndex,
      OffsetIndex offsetIndex,
      String originalCreatedBy)
      throws IOException {
    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
    CompressionCodecFactory.BytesInputCompressor compressor = null;
    if (!newCodecName.equals(chunk.getCodec())) {
      // Re-compress only if a different codec has been specified
      decompressor = codecFactory.getDecompressor(chunk.getCodec());
      compressor = codecFactory.getCompressor(newCodecName);
    }

    // EncryptorRunTime is only provided when encryption is required
    BlockCipher.Encryptor metaEncryptor = null;
    BlockCipher.Encryptor dataEncryptor = null;
    byte[] dictPageAAD = null;
    byte[] dataPageAAD = null;
    byte[] dictPageHeaderAAD = null;
    byte[] dataPageHeaderAAD = null;
    if (columnChunkEncryptorRunTime != null) {
      metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
      dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
      dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
      dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
      dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
      dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
    }

    if (bloomFilter != null) {
      writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(), bloomFilter);
    }

    reader.setStreamPosition(chunk.getStartingPos());
    DictionaryPage dictionaryPage = null;
    long readValues = 0L;
    long readRows = 0L;
    Statistics<?> statistics = null;
    boolean isColumnStatisticsMalformed = false;
    ParquetMetadataConverter converter = new ParquetMetadataConverter();
    int pageOrdinal = 0;
    long totalChunkValues = chunk.getValueCount();
    while (readValues < totalChunkValues) {
      PageHeader pageHeader = reader.readPageHeader();
      int compressedPageSize = pageHeader.getCompressed_page_size();
      byte[] pageLoad;
      switch (pageHeader.type) {
        case DICTIONARY_PAGE:
          if (dictionaryPage != null) {
            throw new IOException("has more than one dictionary page in column chunk: " + chunk);
          }
          // No quickUpdatePageAAD needed for dictionary page
          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
          pageLoad = processPageLoad(
              reader,
              true,
              compressor,
              decompressor,
              pageHeader.getCompressed_page_size(),
              pageHeader.getUncompressed_page_size(),
              encryptColumn,
              dataEncryptor,
              dictPageAAD);
          dictionaryPage = new DictionaryPage(
              BytesInput.from(pageLoad),
              pageHeader.getUncompressed_page_size(),
              dictPageHeader.getNum_values(),
              converter.getEncoding(dictPageHeader.getEncoding()));
          writer.writeDictionaryPage(dictionaryPage, metaEncryptor, dictPageHeaderAAD);
          break;
        case DATA_PAGE:
          if (encryptColumn) {
            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
          }
          DataPageHeader headerV1 = pageHeader.data_page_header;
          pageLoad = processPageLoad(
              reader,
              true,
              compressor,
              decompressor,
              pageHeader.getCompressed_page_size(),
              pageHeader.getUncompressed_page_size(),
              encryptColumn,
              dataEncryptor,
              dataPageAAD);
          statistics = convertStatistics(
              originalCreatedBy,
              normalizeNameInType(chunk.getPrimitiveType()),
              headerV1.getStatistics(),
              columnIndex,
              pageOrdinal,
              converter);
          if (statistics == null) {
            // Reach here means both the columnIndex and the page header statistics are null
            isColumnStatisticsMalformed = true;
          } else {
            Preconditions.checkState(
                !isColumnStatisticsMalformed,
                "Detected mixed null page statistics and non-null page statistics");
          }
          readValues += headerV1.getNum_values();
          if (offsetIndex != null) {
            long rowCount = 1
                + offsetIndex.getLastRowIndex(pageOrdinal, blockRowCount)
                - offsetIndex.getFirstRowIndex(pageOrdinal);
            readRows += rowCount;
            writer.writeDataPage(
                toIntWithCheck(headerV1.getNum_values()),
                pageHeader.getUncompressed_page_size(),
                BytesInput.from(pageLoad),
                statistics,
                toIntWithCheck(rowCount),
                converter.getEncoding(headerV1.getRepetition_level_encoding()),
                converter.getEncoding(headerV1.getDefinition_level_encoding()),
                converter.getEncoding(headerV1.getEncoding()),
                metaEncryptor,
                dataPageHeaderAAD);
          } else {
            writer.writeDataPage(
                toIntWithCheck(headerV1.getNum_values()),
                pageHeader.getUncompressed_page_size(),
                BytesInput.from(pageLoad),
                statistics,
                converter.getEncoding(headerV1.getRepetition_level_encoding()),
                converter.getEncoding(headerV1.getDefinition_level_encoding()),
                converter.getEncoding(headerV1.getEncoding()),
                metaEncryptor,
                dataPageHeaderAAD);
          }
          pageOrdinal++;
          break;
        case DATA_PAGE_V2:
          if (encryptColumn) {
            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
          }
          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
          int rlLength = headerV2.getRepetition_levels_byte_length();
          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
          int dlLength = headerV2.getDefinition_levels_byte_length();
          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
          pageLoad = processPageLoad(
              reader,
              headerV2.is_compressed,
              compressor,
              decompressor,
              payLoadLength,
              rawDataLength,
              encryptColumn,
              dataEncryptor,
              dataPageAAD);
          statistics = convertStatistics(
              originalCreatedBy,
              normalizeNameInType(chunk.getPrimitiveType()),
              headerV2.getStatistics(),
              columnIndex,
              pageOrdinal,
              converter);
          if (statistics == null) {
            // Reach here means both the columnIndex and the page header statistics are null
            isColumnStatisticsMalformed = true;
          } else {
            Preconditions.checkState(
                !isColumnStatisticsMalformed,
                "Detected mixed null page statistics and non-null page statistics");
          }
          readValues += headerV2.getNum_values();
          readRows += headerV2.getNum_rows();
          writer.writeDataPageV2(
              headerV2.getNum_rows(),
              headerV2.getNum_nulls(),
              headerV2.getNum_values(),
              rlLevels,
              dlLevels,
              converter.getEncoding(headerV2.getEncoding()),
              BytesInput.from(pageLoad),
              headerV2.is_compressed,
              rawDataLength,
              statistics,
              metaEncryptor,
              dataPageHeaderAAD);
          pageOrdinal++;
          break;
        default:
          LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
          break;
      }
    }

    Preconditions.checkState(
        readRows == 0 || readRows == blockRowCount,
        "Read row count: %s not match with block total row count: %s",
        readRows,
        blockRowCount);

    if (isColumnStatisticsMalformed) {
      // All the column statistics are invalid, so we need to overwrite the column statistics
      writer.invalidateStatistics(chunk.getStatistics());
    }
  }