in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java [1432:1504]
void writeColumnChunk(
ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName,
DictionaryPage dictionaryPage,
BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics<?> totalStats,
SizeStatistics totalSizeStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings,
BlockCipher.Encryptor headerBlockEncryptor,
int rowGroupOrdinal,
int columnOrdinal,
byte[] fileAAD)
throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);
state = state.write();
if (dictionaryPage != null) {
byte[] dictonaryPageHeaderAAD = null;
if (null != headerBlockEncryptor) {
dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
}
writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
}
if (bloomFilter != null) {
// write bloom filter if one of data pages is not dictionary encoded
boolean isWriteBloomFilter = false;
for (Encoding encoding : dataEncodings) {
// dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2
if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) {
isWriteBloomFilter = true;
break;
}
}
if (isWriteBloomFilter) {
currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
} else {
LOG.info(
"No need to write bloom filter because column {} data pages are all encoded as dictionary.",
descriptor.getPath());
}
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
this.uncompressedLength += uncompressedTotalPageSize + headersSize;
this.compressedLength += compressedTotalPageSize + headersSize;
LOG.debug("{}: write data pages content", out.getPos());
currentChunkFirstDataPage = out.getPos();
bytes.writeAllTo(out);
encodingStatsBuilder.addDataEncodings(dataEncodings);
if (rlEncodings.isEmpty()) {
encodingStatsBuilder.withV2Pages();
}
currentEncodings.addAll(rlEncodings);
currentEncodings.addAll(dlEncodings);
currentEncodings.addAll(dataEncodings);
currentStatistics = totalStats;
currentSizeStatistics = totalSizeStats;
this.columnIndexBuilder = columnIndexBuilder;
this.offsetIndexBuilder = offsetIndexBuilder;
endColumn();
}