parquet/metadata/bloom_filter.go (453 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import (
"errors"
"fmt"
"io"
"math"
"sync"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/debug"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/thrift"
"github.com/apache/arrow-go/v18/parquet/internal/utils"
"github.com/apache/arrow-go/v18/parquet/schema"
"github.com/cespare/xxhash/v2"
)
const (
bytesPerFilterBlock = 32
bitsSetPerBlock = 8
minimumBloomFilterBytes = bytesPerFilterBlock
// currently using 128MB as maximum size, should probably be reconsidered
maximumBloomFilterBytes = 128 * 1024 * 1024
)
var (
salt = [bitsSetPerBlock]uint32{
0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}
defaultHashStrategy = format.BloomFilterHash{XXHASH: &format.XxHash{}}
defaultAlgorithm = format.BloomFilterAlgorithm{BLOCK: &format.SplitBlockAlgorithm{}}
defaultCompression = format.BloomFilterCompression{UNCOMPRESSED: &format.Uncompressed{}}
)
func optimalNumBytes(ndv uint32, fpp float64) uint32 {
optimalBits := optimalNumBits(ndv, fpp)
debug.Assert(bitutil.IsMultipleOf8(int64(optimalBits)), "optimal bits should be multiple of 8")
return optimalBits >> 3
}
func optimalNumBits(ndv uint32, fpp float64) uint32 {
debug.Assert(fpp > 0 && fpp < 1, "false positive prob must be in (0, 1)")
var (
m = -8 * float64(ndv) / math.Log(1-math.Pow(fpp, 1.0/8.0))
numBits uint32
)
if m < 0 || m > maximumBloomFilterBytes>>3 {
numBits = maximumBloomFilterBytes << 3
} else {
numBits = uint32(m)
}
// round up to lower bound
if numBits < minimumBloomFilterBytes<<3 {
numBits = minimumBloomFilterBytes << 3
}
// get next power of 2 if bits is not power of 2
if (numBits & (numBits - 1)) != 0 {
numBits = uint32(bitutil.NextPowerOf2(int(numBits)))
}
return numBits
}
type Hasher interface {
Sum64(b []byte) uint64
Sum64s(b [][]byte) []uint64
}
type xxhasher struct{}
func (xxhasher) Sum64(b []byte) uint64 {
return xxhash.Sum64(b)
}
func (xxhasher) Sum64s(b [][]byte) (vals []uint64) {
vals = make([]uint64, len(b))
for i, v := range b {
vals[i] = xxhash.Sum64(v)
}
return
}
func GetHash[T parquet.ColumnTypes](h Hasher, v T) uint64 {
return h.Sum64(getBytes(v))
}
func GetHashes[T parquet.ColumnTypes](h Hasher, vals []T) []uint64 {
return h.Sum64s(getBytesSlice(vals))
}
func GetSpacedHashes[T parquet.ColumnTypes](h Hasher, numValid int64, vals []T, validBits []byte, validBitsOffset int64) []uint64 {
if numValid == 0 {
return []uint64{}
}
out := make([]uint64, 0, numValid)
// TODO: replace with bitset run reader pool
setReader := bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(vals)))
for {
run := setReader.NextRun()
if run.Length == 0 {
break
}
out = append(out, h.Sum64s(getBytesSlice(vals[run.Pos:run.Pos+run.Length]))...)
}
return out
}
func getBytes[T parquet.ColumnTypes](v T) []byte {
switch v := any(v).(type) {
case parquet.ByteArray:
return v
case parquet.FixedLenByteArray:
return v
case parquet.Int96:
return v[:]
}
return unsafe.Slice((*byte)(unsafe.Pointer(&v)), unsafe.Sizeof(v))
}
func getBytesSlice[T parquet.ColumnTypes](v []T) [][]byte {
b := make([][]byte, len(v))
switch v := any(v).(type) {
case []parquet.ByteArray:
for i, vv := range v {
b[i] = vv
}
return b
case []parquet.FixedLenByteArray:
for i, vv := range v {
b[i] = vv
}
return b
case []parquet.Int96:
for i, vv := range v {
b[i] = vv[:]
}
return b
}
var z T
sz, ptr := int(unsafe.Sizeof(z)), unsafe.SliceData(v)
raw := unsafe.Slice((*byte)(unsafe.Pointer(ptr)), sz*len(v))
for i := range b {
b[i] = raw[i*sz : (i+1)*sz]
}
return b
}
type blockSplitBloomFilter struct {
data *memory.Buffer
bitset32 []uint32
hasher Hasher
algorithm format.BloomFilterAlgorithm
hashStrategy format.BloomFilterHash
compression format.BloomFilterCompression
}
func (b *blockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
return &b.algorithm
}
func (b *blockSplitBloomFilter) getHashStrategy() *format.BloomFilterHash {
return &b.hashStrategy
}
func (b *blockSplitBloomFilter) getCompression() *format.BloomFilterCompression {
return &b.compression
}
func (b *blockSplitBloomFilter) CheckHash(hash uint64) bool {
return checkHash(b.bitset32, hash)
}
func (b *blockSplitBloomFilter) CheckBulk(hashes []uint64) []bool {
results := make([]bool, len(hashes))
checkBulk(b.bitset32, hashes, results)
return results
}
func (b *blockSplitBloomFilter) InsertHash(hash uint64) {
insertHash(b.bitset32, hash)
}
func (b *blockSplitBloomFilter) InsertBulk(hashes []uint64) {
insertBulk(b.bitset32, hashes)
}
func (b *blockSplitBloomFilter) Hasher() Hasher {
return b.hasher
}
func (b *blockSplitBloomFilter) Size() int64 {
return int64(len(b.bitset32) * 4)
}
func (b *blockSplitBloomFilter) WriteTo(w io.Writer, enc encryption.Encryptor) (int, error) {
if enc != nil {
n := enc.Encrypt(w, b.data.Bytes())
return n, nil
}
return w.Write(b.data.Bytes())
}
func NewBloomFilter(numBytes, maxBytes uint32, mem memory.Allocator) BloomFilterBuilder {
if numBytes < minimumBloomFilterBytes {
numBytes = minimumBloomFilterBytes
}
if maxBytes > maximumBloomFilterBytes {
maxBytes = maximumBloomFilterBytes
}
if numBytes > maxBytes {
numBytes = maxBytes
}
// get next power of 2 if it's not a power of 2
if (numBytes & (numBytes - 1)) != 0 {
numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
}
buf := memory.NewResizableBuffer(mem)
buf.ResizeNoShrink(int(numBytes))
bf := &blockSplitBloomFilter{
data: buf,
bitset32: arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
hasher: xxhasher{},
algorithm: format.BloomFilterAlgorithm{BLOCK: &format.SplitBlockAlgorithm{}},
hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
compression: format.BloomFilterCompression{UNCOMPRESSED: &format.Uncompressed{}},
}
addCleanup(bf, nil)
return bf
}
func NewBloomFilterFromNDVAndFPP(ndv uint32, fpp float64, maxBytes int64, mem memory.Allocator) BloomFilterBuilder {
numBytes := optimalNumBytes(ndv, fpp)
if numBytes > uint32(maxBytes) {
numBytes = uint32(maxBytes)
}
buf := memory.NewResizableBuffer(mem)
buf.ResizeNoShrink(int(numBytes))
bf := &blockSplitBloomFilter{
data: buf,
bitset32: arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
hasher: xxhasher{},
algorithm: format.BloomFilterAlgorithm{BLOCK: &format.SplitBlockAlgorithm{}},
hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
compression: format.BloomFilterCompression{UNCOMPRESSED: &format.Uncompressed{}},
}
addCleanup(bf, nil)
return bf
}
type BloomFilterBuilder interface {
Hasher() Hasher
Size() int64
InsertHash(hash uint64)
InsertBulk(hashes []uint64)
WriteTo(io.Writer, encryption.Encryptor) (int, error)
getAlg() *format.BloomFilterAlgorithm
getHashStrategy() *format.BloomFilterHash
getCompression() *format.BloomFilterCompression
}
type BloomFilter interface {
Hasher() Hasher
CheckHash(hash uint64) bool
Size() int64
}
type TypedBloomFilter[T parquet.ColumnTypes] struct {
BloomFilter
}
func (b *TypedBloomFilter[T]) Check(v T) bool {
h := b.Hasher()
return b.CheckHash(h.Sum64(getBytes(v)))
}
func validateBloomFilterHeader(hdr *format.BloomFilterHeader) error {
if hdr == nil {
return errors.New("bloom filter header must not be nil")
}
if !hdr.Algorithm.IsSetBLOCK() {
return fmt.Errorf("unsupported bloom filter algorithm: %s", hdr.Algorithm)
}
if !hdr.Compression.IsSetUNCOMPRESSED() {
return fmt.Errorf("unsupported bloom filter compression: %s", hdr.Compression)
}
if !hdr.Hash.IsSetXXHASH() {
return fmt.Errorf("unsupported bloom filter hash strategy: %s", hdr.Hash)
}
if hdr.NumBytes < minimumBloomFilterBytes || hdr.NumBytes > maximumBloomFilterBytes {
return fmt.Errorf("invalid bloom filter size: %d", hdr.NumBytes)
}
return nil
}
type BloomFilterReader struct {
Input parquet.ReaderAtSeeker
FileMetadata *FileMetaData
Props *parquet.ReaderProperties
FileDecryptor encryption.FileDecryptor
BufferPool *sync.Pool
}
func (r *BloomFilterReader) RowGroup(i int) (*RowGroupBloomFilterReader, error) {
if i < 0 || i >= len(r.FileMetadata.RowGroups) {
return nil, fmt.Errorf("row group index %d out of range", i)
}
rgMeta := r.FileMetadata.RowGroup(i)
return &RowGroupBloomFilterReader{
input: r.Input,
rgMeta: rgMeta,
fileDecryptor: r.FileDecryptor,
rgOrdinal: int16(i),
bufferPool: r.BufferPool,
sourceFileSize: r.FileMetadata.sourceFileSize,
}, nil
}
type RowGroupBloomFilterReader struct {
input parquet.ReaderAtSeeker
rgMeta *RowGroupMetaData
fileDecryptor encryption.FileDecryptor
rgOrdinal int16
sourceFileSize int64
bufferPool *sync.Pool
}
func (r *RowGroupBloomFilterReader) GetColumnBloomFilter(i int) (BloomFilter, error) {
if i < 0 || i >= r.rgMeta.NumColumns() {
return nil, fmt.Errorf("column index %d out of range", i)
}
col, err := r.rgMeta.ColumnChunk(i)
if err != nil {
return nil, err
}
var (
decryptor encryption.Decryptor
header format.BloomFilterHeader
offset int64
bloomFilterReadSize int32 = 256
)
if offset = col.BloomFilterOffset(); offset <= 0 {
return nil, nil
}
if col.BloomFilterLength() > 0 {
bloomFilterReadSize = col.BloomFilterLength()
}
sectionRdr := io.NewSectionReader(r.input, offset, r.sourceFileSize-offset)
cryptoMetadata := col.CryptoMetadata()
if cryptoMetadata != nil {
decryptor, err = encryption.GetColumnMetaDecryptor(cryptoMetadata, r.fileDecryptor)
if err != nil {
return nil, err
}
encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
encryption.BloomFilterHeaderModule)
hdr := decryptor.DecryptFrom(sectionRdr)
if _, err = thrift.DeserializeThrift(&header, hdr); err != nil {
return nil, err
}
if err = validateBloomFilterHeader(&header); err != nil {
return nil, err
}
encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
encryption.BloomFilterBitsetModule)
bitset := decryptor.DecryptFrom(sectionRdr)
if len(bitset) != int(header.NumBytes) {
return nil, fmt.Errorf("wrong length of decrypted bloom filter bitset: %d vs %d",
len(bitset), header.NumBytes)
}
return &blockSplitBloomFilter{
data: memory.NewBufferBytes(bitset),
bitset32: arrow.Uint32Traits.CastFromBytes(bitset),
hasher: xxhasher{},
algorithm: *header.Algorithm,
hashStrategy: *header.Hash,
compression: *header.Compression,
}, nil
}
headerBuf := r.bufferPool.Get().(*memory.Buffer)
headerBuf.ResizeNoShrink(int(bloomFilterReadSize))
defer func() {
if headerBuf != nil {
headerBuf.ResizeNoShrink(0)
r.bufferPool.Put(headerBuf)
}
}()
if _, err = sectionRdr.Read(headerBuf.Bytes()); err != nil {
return nil, err
}
remaining, err := thrift.DeserializeThrift(&header, headerBuf.Bytes())
if err != nil {
return nil, err
}
headerSize := len(headerBuf.Bytes()) - int(remaining)
if err = validateBloomFilterHeader(&header); err != nil {
return nil, err
}
bloomFilterSz := header.NumBytes
var bitset []byte
if int(bloomFilterSz)+headerSize <= len(headerBuf.Bytes()) {
// bloom filter data is entirely contained in the buffer we just read
bitset = headerBuf.Bytes()[headerSize : headerSize+int(bloomFilterSz)]
} else {
buf := r.bufferPool.Get().(*memory.Buffer)
buf.ResizeNoShrink(int(bloomFilterSz))
filterBytesInHeader := headerBuf.Len() - headerSize
if filterBytesInHeader > 0 {
copy(buf.Bytes(), headerBuf.Bytes()[headerSize:])
}
if _, err = sectionRdr.Read(buf.Bytes()[filterBytesInHeader:]); err != nil {
return nil, err
}
bitset = buf.Bytes()
headerBuf.ResizeNoShrink(0)
r.bufferPool.Put(headerBuf)
headerBuf = buf
}
bf := &blockSplitBloomFilter{
data: headerBuf,
bitset32: arrow.GetData[uint32](bitset),
hasher: xxhasher{},
algorithm: *header.Algorithm,
hashStrategy: *header.Hash,
compression: *header.Compression,
}
headerBuf = nil
addCleanup(bf, r.bufferPool)
return bf, nil
}
type FileBloomFilterBuilder struct {
Schema *schema.Schema
Encryptor encryption.FileEncryptor
rgMetaBldrs []*RowGroupMetaDataBuilder
bloomFilters []map[string]BloomFilterBuilder
}
func (f *FileBloomFilterBuilder) AppendRowGroup(rgMeta *RowGroupMetaDataBuilder, filters map[string]BloomFilterBuilder) {
f.rgMetaBldrs = append(f.rgMetaBldrs, rgMeta)
f.bloomFilters = append(f.bloomFilters, filters)
}
func (f *FileBloomFilterBuilder) WriteTo(w utils.WriterTell) error {
if len(f.rgMetaBldrs) == 0 || len(f.bloomFilters) == 0 {
return nil
}
var (
hdr format.BloomFilterHeader
serializer = thrift.NewThriftSerializer()
)
for rg, rgMeta := range f.rgMetaBldrs {
if len(f.bloomFilters[rg]) == 0 {
continue
}
for c, col := range rgMeta.colBuilders {
colPath := col.column.Path()
bf, ok := f.bloomFilters[rg][colPath]
if !ok || bf == nil {
continue
}
offset := w.Tell()
col.chunk.MetaData.BloomFilterOffset = &offset
var encryptor encryption.Encryptor
if f.Encryptor != nil {
encryptor = f.Encryptor.GetColumnMetaEncryptor(colPath)
}
if encryptor != nil {
encryptor.UpdateAad(encryption.CreateModuleAad(
encryptor.FileAad(), encryption.BloomFilterHeaderModule,
int16(rg), int16(c), encryption.NonPageOrdinal))
}
hdr.NumBytes = int32(bf.Size())
hdr.Algorithm = bf.getAlg()
hdr.Hash = bf.getHashStrategy()
hdr.Compression = bf.getCompression()
_, err := serializer.Serialize(&hdr, w, encryptor)
if err != nil {
return err
}
if encryptor != nil {
encryptor.UpdateAad(encryption.CreateModuleAad(
encryptor.FileAad(), encryption.BloomFilterBitsetModule,
int16(rg), int16(c), encryption.NonPageOrdinal))
}
if _, err = bf.WriteTo(w, encryptor); err != nil {
return err
}
dataWritten := int32(w.Tell() - offset)
col.chunk.MetaData.BloomFilterLength = &dataWritten
}
}
return nil
}