in common/src/main/java/org/apache/comet/parquet/FileReader.java [859:1019]
private ColumnPageReader readAllPages(
BlockCipher.Decryptor headerBlockDecryptor,
BlockCipher.Decryptor pageBlockDecryptor,
byte[] aadPrefix,
int rowGroupOrdinal,
int columnOrdinal)
throws IOException {
List<DataPage> pagesInChunk = new ArrayList<>();
DictionaryPage dictionaryPage = null;
PrimitiveType type =
fileMetaData.getSchema().getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
int dataPageCountReadSoFar = 0;
byte[] dataPageHeaderAAD = null;
if (null != headerBlockDecryptor) {
dataPageHeaderAAD =
AesCipher.createModuleAAD(
aadPrefix,
ModuleCipherFactory.ModuleType.DataPageHeader,
rowGroupOrdinal,
columnOrdinal,
getPageOrdinal(dataPageCountReadSoFar));
}
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
byte[] pageHeaderAAD = dataPageHeaderAAD;
if (null != headerBlockDecryptor) {
// Important: this verifies file integrity (makes sure dictionary page had not been
// removed)
if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
pageHeaderAAD =
AesCipher.createModuleAAD(
aadPrefix,
ModuleCipherFactory.ModuleType.DictionaryPageHeader,
rowGroupOrdinal,
columnOrdinal,
-1);
} else {
int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
}
PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput pageBytes;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException(
"more than one dictionary page in column " + descriptor.col);
}
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes.toByteArray(),
"could not verify dictionary page integrity, CRC checksum verification failed");
}
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
dictionaryPage =
new DictionaryPage(
pageBytes,
uncompressedPageSize,
dicHeader.getNum_values(),
converter.getEncoding(dicHeader.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dictionaryPage.setCrc(pageHeader.getCrc());
}
break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes.toByteArray(),
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV1 dataPageV1 =
new DataPageV1(
pageBytes,
dataHeaderV1.getNum_values(),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type),
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV1.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV1);
valuesCountReadSoFar += dataHeaderV1.getNum_values();
++dataPageCountReadSoFar;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize =
compressedPageSize
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length();
pagesInChunk.add(
new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
converter.getEncoding(dataHeaderV2.getEncoding()),
this.readAsBytesInput(dataSize),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type),
dataHeaderV2.isIs_compressed()));
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
default:
LOG.debug(
"skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
stream.skipFully(compressedPageSize);
break;
}
}
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected "
+ descriptor.metadata.getValueCount()
+ " values in column chunk at "
+ file
+ " offset "
+ descriptor.metadata.getFirstDataPageOffset()
+ " but got "
+ valuesCountReadSoFar
+ " values instead over "
+ pagesInChunk.size()
+ " pages ending at file offset "
+ (descriptor.fileOffset + stream.position()));
}
CompressionCodecFactory.BytesInputDecompressor decompressor =
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnPageReader(
decompressor,
pagesInChunk,
dictionaryPage,
offsetIndex,
blocks.get(currentBlock).getRowCount(),
pageBlockDecryptor,
aadPrefix,
rowGroupOrdinal,
columnOrdinal);
}