in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java [282:371]
public void writePageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
Statistics<?> statistics,
SizeStatistics sizeStatistics)
throws IOException {
pageOrdinal++;
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size());
boolean compressed = false;
BytesInput compressedData = BytesInput.empty();
if (data.size() > 0) {
// TODO: decide if we compress
compressedData = compressor.compress(data);
compressed = true;
}
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
}
int compressedSize =
toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size());
tempOutputStream.reset();
if (null != headerBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
if (pageWriteChecksumEnabled) {
crc.reset();
if (repetitionLevels.size() > 0) {
crc.update(repetitionLevels.toByteArray());
}
if (definitionLevels.size() > 0) {
crc.update(definitionLevels.toByteArray());
}
if (compressedData.size() > 0) {
crc.update(compressedData.toByteArray());
}
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
compressed,
(int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
} else {
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
compressed,
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
}
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
mergeColumnStatistics(statistics, sizeStatistics);
offsetIndexBuilder.add(
toIntWithCheck((long) tempOutputStream.size() + compressedSize),
rowCount,
sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(BytesInput.concat(
BytesInput.from(tempOutputStream), repetitionLevels, definitionLevels, compressedData));
dataEncodings.add(dataEncoding);
}