in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java [133:295]
public DataPage readPage() {
final DataPage compressedPage = compressedPages.poll();
if (compressedPage == null) {
return null;
}
final int currentPageIndex = pageIndex++;
if (null != blockDecryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex));
}
return compressedPage.accept(new DataPage.Visitor<DataPage>() {
@Override
public DataPage visit(DataPageV1 dataPageV1) {
try {
BytesInput bytes = dataPageV1.getBytes();
BytesInput decompressed;
if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = bytes.toByteBuffer(releaser);
if (!byteBuffer.isDirect()) {
throw new ParquetDecodingException("Expected a direct buffer");
}
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
long compressedSize = byteBuffer.limit();
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
releaser.releaseLater(decompressedBuffer);
long start = System.nanoTime();
decompressor.decompress(
byteBuffer,
(int) compressedSize,
decompressedBuffer,
dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);
decompressedBuffer.flip();
decompressed = BytesInput.from(decompressedBuffer);
} else { // use on-heap buffer
if (null != blockDecryptor) {
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
}
long start = System.nanoTime();
decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);
}
final DataPageV1 decompressedPage;
if (offsetIndex == null) {
decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
dataPageV1.getStatistics(),
dataPageV1.getRlEncoding(),
dataPageV1.getDlEncoding(),
dataPageV1.getValueEncoding());
} else {
long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
firstRowIndex,
Math.toIntExact(offsetIndex.getLastRowIndex(currentPageIndex, rowCount)
- firstRowIndex
+ 1),
dataPageV1.getStatistics(),
dataPageV1.getRlEncoding(),
dataPageV1.getDlEncoding(),
dataPageV1.getValueEncoding());
}
if (dataPageV1.getCrc().isPresent()) {
decompressedPage.setCrc(dataPageV1.getCrc().getAsInt());
}
return decompressedPage;
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
}
@Override
public DataPage visit(DataPageV2 dataPageV2) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
return dataPageV2;
}
BytesInput pageBytes = dataPageV2.getData();
try {
if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser);
if (!byteBuffer.isDirect()) {
throw new ParquetDecodingException("Expected a direct buffer");
}
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
long compressedSize = byteBuffer.limit();
if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(uncompressedSize);
releaser.releaseLater(decompressedBuffer);
long start = System.nanoTime();
decompressor.decompress(
byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize);
setDecompressMetrics(pageBytes, start);
decompressedBuffer.flip();
pageBytes = BytesInput.from(decompressedBuffer);
} else {
pageBytes = BytesInput.from(byteBuffer);
}
} else {
if (null != blockDecryptor) {
pageBytes =
BytesInput.from(blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
}
if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
long start = System.nanoTime();
pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
setDecompressMetrics(pageBytes, start);
}
}
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
final DataPageV2 decompressedPage;
if (offsetIndex == null) {
decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
pageBytes,
dataPageV2.getStatistics());
} else {
decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
offsetIndex.getFirstRowIndex(currentPageIndex),
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
pageBytes,
dataPageV2.getStatistics());
}
if (dataPageV2.getCrc().isPresent()) {
decompressedPage.setCrc(dataPageV2.getCrc().getAsInt());
}
return decompressedPage;
}
});
}