private void processChunk()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [337:488]


  private void processChunk(ColumnChunkMetaData chunk,
                            CompressionCodecName newCodecName,
                            ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
                            boolean encryptColumn) throws IOException {
    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
    CompressionCodecFactory.BytesInputCompressor compressor = null;
    if (!newCodecName.equals(chunk.getCodec())) {
      // Re-compress only if a different codec has been specified
      decompressor = codecFactory.getDecompressor(chunk.getCodec());
      compressor = codecFactory.getCompressor(newCodecName);
    }

    // EncryptorRunTime is only provided when encryption is required
    BlockCipher.Encryptor metaEncryptor = null;
    BlockCipher.Encryptor dataEncryptor = null;
    byte[] dictPageAAD = null;
    byte[] dataPageAAD = null;
    byte[] dictPageHeaderAAD = null;
    byte[] dataPageHeaderAAD = null;
    if (columnChunkEncryptorRunTime != null) {
      metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
      dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
      dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
      dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
      dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
      dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
    }

    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);

    reader.setStreamPosition(chunk.getStartingPos());
    DictionaryPage dictionaryPage = null;
    long readValues = 0;
    Statistics statistics = null;
    ParquetMetadataConverter converter = new ParquetMetadataConverter();
    int pageOrdinal = 0;
    long totalChunkValues = chunk.getValueCount();
    while (readValues < totalChunkValues) {
      PageHeader pageHeader = reader.readPageHeader();
      int compressedPageSize = pageHeader.getCompressed_page_size();
      byte[] pageLoad;
      switch (pageHeader.type) {
        case DICTIONARY_PAGE:
          if (dictionaryPage != null) {
            throw new IOException("has more than one dictionary page in column chunk");
          }
          //No quickUpdatePageAAD needed for dictionary page
          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
          pageLoad = processPageLoad(reader,
                  true,
                  compressor,
                  decompressor,
                  pageHeader.getCompressed_page_size(),
                  pageHeader.getUncompressed_page_size(),
                  encryptColumn,
                  dataEncryptor,
                  dictPageAAD);
          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
                          pageHeader.getUncompressed_page_size(),
                          dictPageHeader.getNum_values(),
                          converter.getEncoding(dictPageHeader.getEncoding())),
                  metaEncryptor,
                  dictPageHeaderAAD);
          break;
        case DATA_PAGE:
          if (encryptColumn) {
            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
          }
          DataPageHeader headerV1 = pageHeader.data_page_header;
          pageLoad = processPageLoad(reader,
                  true,
                  compressor,
                  decompressor,
                  pageHeader.getCompressed_page_size(),
                  pageHeader.getUncompressed_page_size(),
                  encryptColumn,
                  dataEncryptor,
                  dataPageAAD);
          statistics = convertStatistics(
                  originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
          readValues += headerV1.getNum_values();
          if (offsetIndex != null) {
            long rowCount = 1 + offsetIndex.getLastRowIndex(
                    pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
                    pageHeader.getUncompressed_page_size(),
                    BytesInput.from(pageLoad),
                    statistics,
                    toIntWithCheck(rowCount),
                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
                    converter.getEncoding(headerV1.getEncoding()),
                    metaEncryptor,
                    dataPageHeaderAAD);
          } else {
            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
                    pageHeader.getUncompressed_page_size(),
                    BytesInput.from(pageLoad),
                    statistics,
                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
                    converter.getEncoding(headerV1.getEncoding()),
                    metaEncryptor,
                    dataPageHeaderAAD);
          }
          pageOrdinal++;
          break;
        case DATA_PAGE_V2:
          if (encryptColumn) {
            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
          }
          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
          int rlLength = headerV2.getRepetition_levels_byte_length();
          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
          int dlLength = headerV2.getDefinition_levels_byte_length();
          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
          pageLoad = processPageLoad(
                  reader,
                  headerV2.is_compressed,
                  compressor,
                  decompressor,
                  payLoadLength,
                  rawDataLength,
                  encryptColumn,
                  dataEncryptor,
                  dataPageAAD);
          statistics = convertStatistics(
                  originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
          readValues += headerV2.getNum_values();
          writer.writeDataPageV2(headerV2.getNum_rows(),
                  headerV2.getNum_nulls(),
                  headerV2.getNum_values(),
                  rlLevels,
                  dlLevels,
                  converter.getEncoding(headerV2.getEncoding()),
                  BytesInput.from(pageLoad),
                  rawDataLength,
                  statistics);
          pageOrdinal++;
          break;
        default:
          LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
          break;
      }
    }
  }