in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java [154:230]
public void writePage(BytesInput bytes,
int valueCount,
int rowCount,
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
pageOrdinal++;
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE || uncompressedSize < 0) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE or negative bytes: " +
uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
}
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+ compressedSize);
}
tempOutputStream.reset();
if (null != headerBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(compressedBytes.toByteArray());
parquetMetadataConverter.writeDataPageV1Header(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
} else {
parquetMetadataConverter.writeDataPageV1Header(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
}
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}
columnIndexBuilder.add(statistics);
offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
rlEncodings.add(rlEncoding);
dlEncodings.add(dlEncoding);
dataEncodings.add(valuesEncoding);
}