in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [255:335]
private void processBlocksFromReader() throws IOException {
PageReadStore store = reader.readNextRowGroup();
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
int blockId = 0;
while (store != null) {
writer.startBlock(store.getRowCount());
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
// This column has been pruned.
if (descriptor == null) {
continue;
}
// If a column is encrypted, we simply throw exception.
// Later we can add a feature to trans-encrypt it with different keys
if (chunk.isEncrypted()) {
throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
}
reader.setStreamPosition(chunk.getStartingPos());
CompressionCodecName newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName;
boolean encryptColumn = encryptMode && encryptColumns != null && encryptColumns.contains(chunk.getPath());
if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
// Mask column and compress it again.
MaskMode maskMode = maskColumns.get(chunk.getPath());
if (maskMode.equals(MaskMode.NULLIFY)) {
Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
if (repetition.equals(Type.Repetition.REQUIRED)) {
throw new IOException(
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
descriptor,
chunk,
crStore,
writer,
schema,
newCodecName,
encryptColumn);
} else {
throw new UnsupportedOperationException("Only nullify is supported for now");
}
} else if (encryptMode || this.newCodecName != null) {
// Prepare encryption context
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
if (encryptMode) {
columnChunkEncryptorRunTime =
new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
}
// Translate compression and/or encryption
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
}
columnId++;
}
writer.endBlock();
store = reader.readNextRowGroup();
blockId++;
numBlocksRewritten++;
}
}