in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java [122:284]
public DataPage readPage() {
final DataPage compressedPage = compressedPages.poll();
if (compressedPage == null) {
return null;
}
final int currentPageIndex = pageIndex++;
if (null != blockDecryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex));
}
return compressedPage.accept(new DataPage.Visitor<DataPage>() {
@Override
public DataPage visit(DataPageV1 dataPageV1) {
try {
BytesInput bytes = dataPageV1.getBytes();
BytesInput decompressed;
if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = bytes.toByteBuffer();
if (!byteBuffer.isDirect()) {
throw new ParquetDecodingException("Expected a direct buffer");
}
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
long compressedSize = byteBuffer.limit();
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
decompressor.decompress(byteBuffer, (int) compressedSize, decompressedBuffer,
dataPageV1.getUncompressedSize());
// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
if (decompressedBuffer.position() != 0) {
decompressedBuffer.flip();
}
decompressed = BytesInput.from(decompressedBuffer);
} else { // use on-heap buffer
if (null != blockDecryptor) {
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
}
decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
}
final DataPageV1 decompressedPage;
if (offsetIndex == null) {
decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
dataPageV1.getStatistics(),
dataPageV1.getRlEncoding(),
dataPageV1.getDlEncoding(),
dataPageV1.getValueEncoding());
} else {
long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
firstRowIndex,
Math.toIntExact(offsetIndex.getLastRowIndex(currentPageIndex, rowCount) - firstRowIndex + 1),
dataPageV1.getStatistics(),
dataPageV1.getRlEncoding(),
dataPageV1.getDlEncoding(),
dataPageV1.getValueEncoding());
}
if (dataPageV1.getCrc().isPresent()) {
decompressedPage.setCrc(dataPageV1.getCrc().getAsInt());
}
return decompressedPage;
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
}
@Override
public DataPage visit(DataPageV2 dataPageV2) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
return dataPageV2;
}
BytesInput pageBytes = dataPageV2.getData();
try {
BytesInput decompressed;
long compressedSize;
if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = pageBytes.toByteBuffer();
if (!byteBuffer.isDirect()) {
throw new ParquetDecodingException("Expected a direct buffer");
}
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
compressedSize = byteBuffer.limit();
if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(
dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(uncompressedSize);
decompressor.decompress(byteBuffer, (int) compressedSize, decompressedBuffer,
uncompressedSize);
// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
if (decompressedBuffer.position() != 0) {
decompressedBuffer.flip();
}
pageBytes = BytesInput.from(decompressedBuffer);
} else {
pageBytes = BytesInput.from(byteBuffer);
}
} else {
if (null != blockDecryptor) {
pageBytes = BytesInput.from(
blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
}
if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(
dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
}
}
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
final DataPageV2 decompressedPage;
if (offsetIndex == null) {
decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
pageBytes,
dataPageV2.getStatistics());
} else {
decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
offsetIndex.getFirstRowIndex(currentPageIndex),
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
pageBytes,
dataPageV2.getStatistics());
}
if (dataPageV2.getCrc().isPresent()) {
decompressedPage.setCrc(dataPageV2.getCrc().getAsInt());
}
return decompressedPage;
}
});
}