in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java [1598:1678]
public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
long bloomFilterOffset = meta.getBloomFilterOffset();
if (bloomFilterOffset < 0) {
return null;
}
// Prepare to decrypt Bloom filter (for encrypted columns)
BlockCipher.Decryptor bloomFilterDecryptor = null;
byte[] bloomFilterHeaderAAD = null;
byte[] bloomFilterBitsetAAD = null;
if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
if (columnDecryptionSetup.isEncrypted()) {
bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
bloomFilterHeaderAAD = AesCipher.createModuleAAD(
fileDecryptor.getFileAAD(),
ModuleType.BloomFilterHeader,
meta.getRowGroupOrdinal(),
columnDecryptionSetup.getOrdinal(),
-1);
bloomFilterBitsetAAD = AesCipher.createModuleAAD(
fileDecryptor.getFileAAD(),
ModuleType.BloomFilterBitset,
meta.getRowGroupOrdinal(),
columnDecryptionSetup.getOrdinal(),
-1);
}
}
// Seek to Bloom filter offset.
f.seek(bloomFilterOffset);
// Read Bloom filter length.
int bloomFilterLength = meta.getBloomFilterLength();
// If it is set, read Bloom filter header and bitset together.
// Otherwise, read Bloom filter header first and then bitset.
InputStream in = f;
if (bloomFilterLength > 0) {
byte[] headerAndBitSet = new byte[bloomFilterLength];
f.readFully(headerAndBitSet);
in = new ByteArrayInputStream(headerAndBitSet);
}
BloomFilterHeader bloomFilterHeader;
try {
bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD);
} catch (IOException e) {
LOG.warn("read no bloom filter");
return null;
}
int numBytes = bloomFilterHeader.getNumBytes();
if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.UPPER_BOUND_BYTES) {
LOG.warn("the read bloom filter size is wrong, size is {}", bloomFilterHeader.getNumBytes());
return null;
}
if (!bloomFilterHeader.getHash().isSetXXHASH()
|| !bloomFilterHeader.getAlgorithm().isSetBLOCK()
|| !bloomFilterHeader.getCompression().isSetUNCOMPRESSED()) {
LOG.warn(
"the read bloom filter is not supported yet, algorithm = {}, hash = {}, compression = {}",
bloomFilterHeader.getAlgorithm(),
bloomFilterHeader.getHash(),
bloomFilterHeader.getCompression());
return null;
}
byte[] bitset;
if (null == bloomFilterDecryptor) {
bitset = new byte[numBytes];
in.read(bitset);
} else {
bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
if (bitset.length != numBytes) {
throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
}
}
return new BlockSplitBloomFilter(bitset);
}