private void processBlocksFromReader()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [255:335]


  private void processBlocksFromReader() throws IOException {
    PageReadStore store = reader.readNextRowGroup();
    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));

    int blockId = 0;
    while (store != null) {
      writer.startBlock(store.getRowCount());

      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();

      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
        ColumnChunkMetaData chunk = columnsInOrder.get(i);
        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());

        // This column has been pruned.
        if (descriptor == null) {
          continue;
        }

        // If a column is encrypted, we simply throw exception.
        // Later we can add a feature to trans-encrypt it with different keys
        if (chunk.isEncrypted()) {
          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
        }

        reader.setStreamPosition(chunk.getStartingPos());
        CompressionCodecName newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName;
        boolean encryptColumn = encryptMode && encryptColumns != null && encryptColumns.contains(chunk.getPath());

        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
          // Mask column and compress it again.
          MaskMode maskMode = maskColumns.get(chunk.getPath());
          if (maskMode.equals(MaskMode.NULLIFY)) {
            Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
            if (repetition.equals(Type.Repetition.REQUIRED)) {
              throw new IOException(
                      "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
            }
            nullifyColumn(
                    descriptor,
                    chunk,
                    crStore,
                    writer,
                    schema,
                    newCodecName,
                    encryptColumn);
          } else {
            throw new UnsupportedOperationException("Only nullify is supported for now");
          }
        } else if (encryptMode || this.newCodecName != null) {
          // Prepare encryption context
          ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
          if (encryptMode) {
            columnChunkEncryptorRunTime =
                    new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
          }

          // Translate compression and/or encryption
          writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
          processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
          writer.endColumn();
        } else {
          // Nothing changed, simply copy the binary data.
          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
          writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
        }

        columnId++;
      }

      writer.endBlock();
      store = reader.readNextRowGroup();
      blockId++;
      numBlocksRewritten++;
    }
  }