in paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java [1255:1443]
public ColumnChunkPageReader readAllPages(
BlockCipher.Decryptor headerBlockDecryptor,
BlockCipher.Decryptor pageBlockDecryptor,
byte[] aadPrefix,
int rowGroupOrdinal,
int columnOrdinal)
throws IOException {
List<DataPage> pagesInChunk = new ArrayList<>();
DictionaryPage dictionaryPage = null;
PrimitiveType type =
getFileMetaData()
.getSchema()
.getType(descriptor.col.getPath())
.asPrimitiveType();
long valuesCountReadSoFar = 0L;
int dataPageCountReadSoFar = 0;
byte[] dataPageHeaderAAD = null;
if (null != headerBlockDecryptor) {
dataPageHeaderAAD =
AesCipher.createModuleAAD(
aadPrefix,
ModuleType.DataPageHeader,
rowGroupOrdinal,
columnOrdinal,
getPageOrdinal(dataPageCountReadSoFar));
}
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
byte[] pageHeaderAAD = dataPageHeaderAAD;
if (null != headerBlockDecryptor) {
// Important: this verifies file integrity (makes sure dictionary page had not
// been removed)
if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
pageHeaderAAD =
AesCipher.createModuleAAD(
aadPrefix,
ModuleType.DictionaryPageHeader,
rowGroupOrdinal,
columnOrdinal,
-1);
} else {
int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
}
PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput pageBytes;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException(
"more than one dictionary page in column " + descriptor.col);
}
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify dictionary page integrity, CRC checksum verification failed");
}
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
dictionaryPage =
new DictionaryPage(
pageBytes,
uncompressedPageSize,
dicHeader.getNum_values(),
converter.getEncoding(dicHeader.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dictionaryPage.setCrc(pageHeader.getCrc());
}
break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV1 dataPageV1 =
new DataPageV1(
pageBytes,
dataHeaderV1.getNum_values(),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV1.getStatistics(),
type),
converter.getEncoding(
dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(
dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV1.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV1);
valuesCountReadSoFar += dataHeaderV1.getNum_values();
++dataPageCountReadSoFar;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize =
compressedPageSize
- dataHeaderV2.getRepetition_levels_byte_length()
- dataHeaderV2.getDefinition_levels_byte_length();
final BytesInput repetitionLevels =
this.readAsBytesInput(
dataHeaderV2.getRepetition_levels_byte_length());
final BytesInput definitionLevels =
this.readAsBytesInput(
dataHeaderV2.getDefinition_levels_byte_length());
final BytesInput values = this.readAsBytesInput(dataSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
pageBytes =
BytesInput.concat(repetitionLevels, definitionLevels, values);
verifyCrc(
pageHeader.getCrc(),
pageBytes,
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV2 dataPageV2 =
new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
repetitionLevels,
definitionLevels,
converter.getEncoding(dataHeaderV2.getEncoding()),
values,
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV2.getStatistics(),
type),
dataHeaderV2.isIs_compressed());
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV2.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV2);
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
default:
LOG.debug(
"skipping page of type {} of size {}",
pageHeader.getType(),
compressedPageSize);
stream.skipFully(compressedPageSize);
break;
}
}
if (offsetIndex == null
&& valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected "
+ descriptor.metadata.getValueCount()
+ " values in column chunk at "
+ getPath()
+ " offset "
+ descriptor.metadata.getFirstDataPageOffset()
+ " but got "
+ valuesCountReadSoFar
+ " values instead over "
+ pagesInChunk.size()
+ " pages ending at file offset "
+ (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor =
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(
decompressor,
pagesInChunk,
dictionaryPage,
offsetIndex,
rowCount,
pageBlockDecryptor,
aadPrefix,
rowGroupOrdinal,
columnOrdinal,
options);
}