private void processBlock()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [485:576]


  private void processBlock(
      TransParquetFileReader reader,
      int blockIdx,
      int outColumnIdx,
      IndexCache indexCache,
      ColumnChunkMetaData chunk)
      throws IOException {
    if (chunk.isEncrypted()) {
      throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
    }

    ColumnChunkMetaData chunkNormalized = chunk;
    if (!renamedColumns.isEmpty()) {
      // Keep an eye if this get stale because of ColumnChunkMetaData change
      chunkNormalized = ColumnChunkMetaData.get(
          normalizeFieldsInPath(chunk.getPath()),
          normalizeNameInType(chunk.getPrimitiveType()),
          chunk.getCodec(),
          chunk.getEncodingStats(),
          chunk.getEncodings(),
          chunk.getStatistics(),
          chunk.getFirstDataPageOffset(),
          chunk.getDictionaryPageOffset(),
          chunk.getValueCount(),
          chunk.getTotalSize(),
          chunk.getTotalUncompressedSize(),
          chunk.getSizeStatistics());
    }

    ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx);
    ColumnDescriptor descriptorRenamed =
        getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx);
    BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
    String originalCreatedBy = reader.getFileMetaData().getCreatedBy();

    reader.setStreamPosition(chunk.getStartingPos());
    CompressionCodecName newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName;
    boolean encryptColumn = encryptMode && encryptColumns != null && encryptColumns.contains(chunk.getPath());

    if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
      // Mask column and compress it again.
      MaskMode maskMode = maskColumns.get(chunk.getPath());
      if (maskMode.equals(MaskMode.NULLIFY)) {
        Type.Repetition repetition =
            descriptorOriginal.getPrimitiveType().getRepetition();
        if (repetition.equals(Type.Repetition.REQUIRED)) {
          throw new IOException("Required column ["
              + descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified");
        }
        nullifyColumn(
            reader,
            blockIdx,
            descriptorOriginal,
            chunk,
            writer,
            newCodecName,
            encryptColumn,
            originalCreatedBy);
      } else {
        throw new UnsupportedOperationException("Only nullify is supported for now");
      }
    } else if (encryptMode || this.newCodecName != null) {
      // Prepare encryption context
      ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
      if (encryptMode) {
        columnChunkEncryptorRunTime =
            new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, outColumnIdx);
      }

      // Translate compression and/or encryption
      writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName);
      processChunk(
          reader,
          blockMetaData.getRowCount(),
          chunk,
          newCodecName,
          columnChunkEncryptorRunTime,
          encryptColumn,
          indexCache.getBloomFilter(chunk),
          indexCache.getColumnIndex(chunk),
          indexCache.getOffsetIndex(chunk),
          originalCreatedBy);
      writer.endColumn();
    } else {
      // Nothing changed, simply copy the binary data.
      BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
      ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
      OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
      writer.appendColumnChunk(
          descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex);
    }
  }