in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [485:576]
private void processBlock(
TransParquetFileReader reader,
int blockIdx,
int outColumnIdx,
IndexCache indexCache,
ColumnChunkMetaData chunk)
throws IOException {
if (chunk.isEncrypted()) {
throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
}
ColumnChunkMetaData chunkNormalized = chunk;
if (!renamedColumns.isEmpty()) {
// Keep an eye if this get stale because of ColumnChunkMetaData change
chunkNormalized = ColumnChunkMetaData.get(
normalizeFieldsInPath(chunk.getPath()),
normalizeNameInType(chunk.getPrimitiveType()),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
chunk.getFirstDataPageOffset(),
chunk.getDictionaryPageOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize(),
chunk.getSizeStatistics());
}
ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx);
ColumnDescriptor descriptorRenamed =
getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx);
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
String originalCreatedBy = reader.getFileMetaData().getCreatedBy();
reader.setStreamPosition(chunk.getStartingPos());
CompressionCodecName newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName;
boolean encryptColumn = encryptMode && encryptColumns != null && encryptColumns.contains(chunk.getPath());
if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
// Mask column and compress it again.
MaskMode maskMode = maskColumns.get(chunk.getPath());
if (maskMode.equals(MaskMode.NULLIFY)) {
Type.Repetition repetition =
descriptorOriginal.getPrimitiveType().getRepetition();
if (repetition.equals(Type.Repetition.REQUIRED)) {
throw new IOException("Required column ["
+ descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
reader,
blockIdx,
descriptorOriginal,
chunk,
writer,
newCodecName,
encryptColumn,
originalCreatedBy);
} else {
throw new UnsupportedOperationException("Only nullify is supported for now");
}
} else if (encryptMode || this.newCodecName != null) {
// Prepare encryption context
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
if (encryptMode) {
columnChunkEncryptorRunTime =
new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, outColumnIdx);
}
// Translate compression and/or encryption
writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName);
processChunk(
reader,
blockMetaData.getRowCount(),
chunk,
newCodecName,
columnChunkEncryptorRunTime,
encryptColumn,
indexCache.getBloomFilter(chunk),
indexCache.getColumnIndex(chunk),
indexCache.getOffsetIndex(chunk),
originalCreatedBy);
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
writer.appendColumnChunk(
descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex);
}
}