in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java [680:739]
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding,
BlockCipher.Encryptor metadataBlockEncryptor,
byte[] pageHeaderAAD) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
currentChunkFirstDataPage = beforeHeader;
}
LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
int compressedPageSize = (int) bytes.size();
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(bytes.toByteArray());
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
out,
metadataBlockEncryptor,
pageHeaderAAD);
} else {
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
out,
metadataBlockEncryptor,
pageHeaderAAD);
}
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);
// Copying the statistics if it is not initialized yet so we have the correct typed one
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
columnIndexBuilder.add(statistics);
encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
}