in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java [182:256]
public void writePage(
BytesInput bytes,
int valueCount,
int rowCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding)
throws IOException {
pageOrdinal++;
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE || uncompressedSize < 0) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE or negative bytes: " + uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedBytes =
BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
}
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize);
}
tempOutputStream.reset();
if (null != headerBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(compressedBytes.toByteArray());
parquetMetadataConverter.writeDataPageV1Header(
(int) uncompressedSize,
(int) compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
} else {
parquetMetadataConverter.writeDataPageV1Header(
(int) uncompressedSize,
(int) compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
}
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
mergeColumnStatistics(statistics, sizeStatistics);
offsetIndexBuilder.add(
toIntWithCheck(tempOutputStream.size() + compressedSize),
rowCount,
sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
rlEncodings.add(rlEncoding);
dlEncodings.add(dlEncoding);
dataEncodings.add(valuesEncoding);
}