in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java [1869:2024]
public ColumnChunkPageReader readAllPages(
BlockCipher.Decryptor headerBlockDecryptor,
BlockCipher.Decryptor pageBlockDecryptor,
byte[] aadPrefix,
int rowGroupOrdinal,
int columnOrdinal)
throws IOException {
List<DataPage> pagesInChunk = new ArrayList<>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData()
.getSchema()
.getType(descriptor.col.getPath())
.asPrimitiveType();
long valuesCountReadSoFar = 0L;
int dataPageCountReadSoFar = 0;
byte[] dataPageHeaderAAD = null;
if (null != headerBlockDecryptor) {
dataPageHeaderAAD = AesCipher.createModuleAAD(
aadPrefix,
ModuleType.DataPageHeader,
rowGroupOrdinal,
columnOrdinal,
getPageOrdinal(dataPageCountReadSoFar));
}
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
byte[] pageHeaderAAD = dataPageHeaderAAD;
if (null != headerBlockDecryptor) {
// Important: this verifies file integrity (makes sure dictionary page had not been removed)
if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
pageHeaderAAD = AesCipher.createModuleAAD(
aadPrefix, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
} else {
int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
}
PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput pageBytes;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException(
"more than one dictionary page in column " + descriptor.col);
}
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify dictionary page integrity, CRC checksum verification failed");
}
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
dictionaryPage = new DictionaryPage(
pageBytes,
uncompressedPageSize,
dicHeader.getNum_values(),
converter.getEncoding(dicHeader.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dictionaryPage.setCrc(pageHeader.getCrc());
}
break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV1 dataPageV1 = new DataPageV1(
pageBytes,
dataHeaderV1.getNum_values(),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type),
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV1.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV1);
valuesCountReadSoFar += dataHeaderV1.getNum_values();
++dataPageCountReadSoFar;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize = compressedPageSize
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length();
final BytesInput repetitionLevels =
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length());
final BytesInput definitionLevels =
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length());
final BytesInput values = this.readAsBytesInput(dataSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
pageBytes = BytesInput.concat(repetitionLevels, definitionLevels, values);
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV2 dataPageV2 = new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
repetitionLevels,
definitionLevels,
converter.getEncoding(dataHeaderV2.getEncoding()),
values,
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type),
dataHeaderV2.isIs_compressed());
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV2.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV2);
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
stream.skipFully(compressedPageSize);
break;
}
}
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " + getPath()
+ " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got "
+ valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor =
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(
decompressor,
pagesInChunk,
dictionaryPage,
offsetIndex,
rowCount,
pageBlockDecryptor,
aadPrefix,
rowGroupOrdinal,
columnOrdinal,
options);
}