in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [337:488]
private void processChunk(ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
boolean encryptColumn) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
if (!newCodecName.equals(chunk.getCodec())) {
// Re-compress only if a different codec has been specified
decompressor = codecFactory.getDecompressor(chunk.getCodec());
compressor = codecFactory.getCompressor(newCodecName);
}
// EncryptorRunTime is only provided when encryption is required
BlockCipher.Encryptor metaEncryptor = null;
BlockCipher.Encryptor dataEncryptor = null;
byte[] dictPageAAD = null;
byte[] dataPageAAD = null;
byte[] dictPageHeaderAAD = null;
byte[] dataPageHeaderAAD = null;
if (columnChunkEncryptorRunTime != null) {
metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics statistics = null;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
while (readValues < totalChunkValues) {
PageHeader pageHeader = reader.readPageHeader();
int compressedPageSize = pageHeader.getCompressed_page_size();
byte[] pageLoad;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
throw new IOException("has more than one dictionary page in column chunk");
}
//No quickUpdatePageAAD needed for dictionary page
DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
pageLoad = processPageLoad(reader,
true,
compressor,
decompressor,
pageHeader.getCompressed_page_size(),
pageHeader.getUncompressed_page_size(),
encryptColumn,
dataEncryptor,
dictPageAAD);
writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
pageHeader.getUncompressed_page_size(),
dictPageHeader.getNum_values(),
converter.getEncoding(dictPageHeader.getEncoding())),
metaEncryptor,
dictPageHeaderAAD);
break;
case DATA_PAGE:
if (encryptColumn) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
}
DataPageHeader headerV1 = pageHeader.data_page_header;
pageLoad = processPageLoad(reader,
true,
compressor,
decompressor,
pageHeader.getCompressed_page_size(),
pageHeader.getUncompressed_page_size(),
encryptColumn,
dataEncryptor,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(
pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
toIntWithCheck(rowCount),
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()),
metaEncryptor,
dataPageHeaderAAD);
} else {
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()),
metaEncryptor,
dataPageHeaderAAD);
}
pageOrdinal++;
break;
case DATA_PAGE_V2:
if (encryptColumn) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
}
DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
int rlLength = headerV2.getRepetition_levels_byte_length();
BytesInput rlLevels = readBlockAllocate(rlLength, reader);
int dlLength = headerV2.getDefinition_levels_byte_length();
BytesInput dlLevels = readBlockAllocate(dlLength, reader);
int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
pageLoad = processPageLoad(
reader,
headerV2.is_compressed,
compressor,
decompressor,
payLoadLength,
rawDataLength,
encryptColumn,
dataEncryptor,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
headerV2.getNum_nulls(),
headerV2.getNum_values(),
rlLevels,
dlLevels,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
rawDataLength,
statistics);
pageOrdinal++;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
break;
}
}
}