in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java [578:788]
private void processChunk(
TransParquetFileReader reader,
long blockRowCount,
ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
boolean encryptColumn,
BloomFilter bloomFilter,
ColumnIndex columnIndex,
OffsetIndex offsetIndex,
String originalCreatedBy)
throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
if (!newCodecName.equals(chunk.getCodec())) {
// Re-compress only if a different codec has been specified
decompressor = codecFactory.getDecompressor(chunk.getCodec());
compressor = codecFactory.getCompressor(newCodecName);
}
// EncryptorRunTime is only provided when encryption is required
BlockCipher.Encryptor metaEncryptor = null;
BlockCipher.Encryptor dataEncryptor = null;
byte[] dictPageAAD = null;
byte[] dataPageAAD = null;
byte[] dictPageHeaderAAD = null;
byte[] dataPageHeaderAAD = null;
if (columnChunkEncryptorRunTime != null) {
metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}
if (bloomFilter != null) {
writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(), bloomFilter);
}
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
long readValues = 0L;
long readRows = 0L;
Statistics<?> statistics = null;
boolean isColumnStatisticsMalformed = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
while (readValues < totalChunkValues) {
PageHeader pageHeader = reader.readPageHeader();
int compressedPageSize = pageHeader.getCompressed_page_size();
byte[] pageLoad;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
throw new IOException("has more than one dictionary page in column chunk: " + chunk);
}
// No quickUpdatePageAAD needed for dictionary page
DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
pageLoad = processPageLoad(
reader,
true,
compressor,
decompressor,
pageHeader.getCompressed_page_size(),
pageHeader.getUncompressed_page_size(),
encryptColumn,
dataEncryptor,
dictPageAAD);
dictionaryPage = new DictionaryPage(
BytesInput.from(pageLoad),
pageHeader.getUncompressed_page_size(),
dictPageHeader.getNum_values(),
converter.getEncoding(dictPageHeader.getEncoding()));
writer.writeDictionaryPage(dictionaryPage, metaEncryptor, dictPageHeaderAAD);
break;
case DATA_PAGE:
if (encryptColumn) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
}
DataPageHeader headerV1 = pageHeader.data_page_header;
pageLoad = processPageLoad(
reader,
true,
compressor,
decompressor,
pageHeader.getCompressed_page_size(),
pageHeader.getUncompressed_page_size(),
encryptColumn,
dataEncryptor,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
normalizeNameInType(chunk.getPrimitiveType()),
headerV1.getStatistics(),
columnIndex,
pageOrdinal,
converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1
+ offsetIndex.getLastRowIndex(pageOrdinal, blockRowCount)
- offsetIndex.getFirstRowIndex(pageOrdinal);
readRows += rowCount;
writer.writeDataPage(
toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
toIntWithCheck(rowCount),
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()),
metaEncryptor,
dataPageHeaderAAD);
} else {
writer.writeDataPage(
toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()),
metaEncryptor,
dataPageHeaderAAD);
}
pageOrdinal++;
break;
case DATA_PAGE_V2:
if (encryptColumn) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
}
DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
int rlLength = headerV2.getRepetition_levels_byte_length();
BytesInput rlLevels = readBlockAllocate(rlLength, reader);
int dlLength = headerV2.getDefinition_levels_byte_length();
BytesInput dlLevels = readBlockAllocate(dlLength, reader);
int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
pageLoad = processPageLoad(
reader,
headerV2.is_compressed,
compressor,
decompressor,
payLoadLength,
rawDataLength,
encryptColumn,
dataEncryptor,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
normalizeNameInType(chunk.getPrimitiveType()),
headerV2.getStatistics(),
columnIndex,
pageOrdinal,
converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV2.getNum_values();
readRows += headerV2.getNum_rows();
writer.writeDataPageV2(
headerV2.getNum_rows(),
headerV2.getNum_nulls(),
headerV2.getNum_values(),
rlLevels,
dlLevels,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
headerV2.is_compressed,
rawDataLength,
statistics,
metaEncryptor,
dataPageHeaderAAD);
pageOrdinal++;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
break;
}
}
Preconditions.checkState(
readRows == 0 || readRows == blockRowCount,
"Read row count: %s not match with block total row count: %s",
readRows,
blockRowCount);
if (isColumnStatisticsMalformed) {
// All the column statistics are invalid, so we need to overwrite the column statistics
writer.invalidateStatistics(chunk.getStatistics());
}
}