banyand/kv/badger.go (278 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kv import ( "bytes" "errors" "log" "math" "time" "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/banyandb" "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) var ( _ Store = (*badgerDB)(nil) _ IndexStore = (*badgerDB)(nil) _ y.Iterator = (*mergedIter)(nil) _ TimeSeriesStore = (*badgerTSS)(nil) bitMergeEntry byte = 1 << 3 // ErrKeyNotFound denotes the expected key can not be got from the kv service. ErrKeyNotFound = badger.ErrKeyNotFound ) type badgerTSS struct { badger.TSet db *badger.DB dbOpts badger.Options } func (b *badgerTSS) Handover(skl *skl.Skiplist) error { return b.db.HandoverIterator(skl.NewUniIterator(false)) } func (b *badgerTSS) Close() error { if b.db != nil && !b.db.IsClosed() { return b.db.Close() } return nil } func (b *badgerTSS) SizeOnDisk() int64 { lsmSize, vlogSize := b.db.Size() return lsmSize + vlogSize } func (b *badgerTSS) CollectStats() *badger.Statistics { return b.db.CollectStats() } type mergedIter struct { delegated Iterator data []byte valid bool } func (i *mergedIter) Next() { i.delegated.Next() i.parseData() } func (i *mergedIter) Rewind() { i.delegated.Rewind() i.parseData() } func (i *mergedIter) Seek(key []byte) { i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64)) } func (i *mergedIter) Key() []byte { return y.KeyWithTs(i.delegated.Key(), uint64(time.Now().UnixNano())) } func (i *mergedIter) Valid() bool { return i.valid } func (i *mergedIter) parseData() { i.data = nil i.valid = i.delegated.Valid() if !i.valid { return } i.data = i.delegated.Val() } func (i *mergedIter) Close() error { i.data = nil i.valid = false return i.delegated.Close() } func (i mergedIter) Value() y.ValueStruct { return y.ValueStruct{ Value: i.data, Meta: bitMergeEntry, } } type badgerDB struct { db *badger.DB dbOpts badger.Options } func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = opt.PrefetchSize opts.PrefetchValues = opt.PrefetchValues opts.Reverse = opt.Reverse it := b.db.NewIterator(opts) defer func() { _ = it.Close() }() for it.Seek(seekKey); it.Valid(); it.Next() { k := y.ParseKey(it.Key()) if len(k) < len(seekKey) { continue } if !bytes.Equal(prefix, k[0:len(prefix)]) { continue } err := f(k, func() ([]byte, error) { return y.Copy(it.Value().Value), nil }) if errors.Is(err, errStopScan) { break } if err != nil { return err } } return nil } func (b *badgerDB) SizeOnDisk() int64 { lsmSize, vlogSize := b.db.Size() return lsmSize + vlogSize } var _ Iterator = (*iterator)(nil) type iterator struct { delegated y.Iterator reverse bool } func (i *iterator) Next() { i.delegated.Next() } func (i *iterator) Rewind() { i.delegated.Rewind() } func (i *iterator) Seek(key []byte) { if i.reverse { i.delegated.Seek(y.KeyWithTs(key, 0)) } else { i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64)) } } func (i *iterator) Key() []byte { return y.ParseKey(i.delegated.Key()) } func (i *iterator) RawKey() []byte { return i.delegated.Key() } func (i *iterator) Val() []byte { return y.Copy(i.delegated.Value().Value) } func (i *iterator) Valid() bool { return i.delegated.Valid() } func (i *iterator) Close() error { return i.delegated.Close() } func (b *badgerDB) NewIterator(opt ScanOpts) Iterator { opts := badger.DefaultIteratorOptions opts.PrefetchSize = opt.PrefetchSize opts.PrefetchValues = opt.PrefetchValues opts.Reverse = opt.Reverse opts.Prefix = opt.Prefix it := b.db.NewIterator(opts) return &iterator{ delegated: it, reverse: opts.Reverse, } } func (b *badgerDB) Close() error { if b.db != nil && !b.db.IsClosed() { return b.db.Close() } return nil } func (b *badgerDB) Put(key, val []byte) error { return b.db.Put(y.KeyWithTs(key, math.MaxInt64), val) } func (b *badgerDB) PutWithVersion(key, val []byte, version uint64) error { return b.db.Put(y.KeyWithTs(key, version), val) } func (b *badgerDB) Get(key []byte) ([]byte, error) { v, err := b.db.Get(y.KeyWithTs(key, math.MaxInt64)) if errors.Is(err, badger.ErrKeyNotFound) { return nil, ErrKeyNotFound } if err != nil { return nil, err } return v.Value, nil } func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error { iter := b.db.NewIterator(badger.DefaultIteratorOptions) var count int for iter.Seek(y.KeyWithTs(key, math.MaxUint64)); iter.Valid(); iter.Next() { if !bytes.Equal(y.ParseKey(iter.Key()), key) { break } count++ err := applyFn(y.Copy(iter.Value().Value)) if err != nil { return err } } if count > 0 { return nil } return ErrKeyNotFound } // badgerLog delegates the zap log to the badger logger. type badgerLog struct { *log.Logger delegated *logger.Logger } func (l *badgerLog) Errorf(f string, v ...interface{}) { l.delegated.Error().Msgf(f, v...) } func (l *badgerLog) Warningf(f string, v ...interface{}) { l.delegated.Warn().Msgf(f, v...) } func (l *badgerLog) Infof(f string, v ...interface{}) { l.delegated.Info().Msgf(f, v...) } func (l *badgerLog) Debugf(f string, v ...interface{}) { l.delegated.Debug().Msgf(f, v...) } var _ banyandb.SeriesEncoderPool = (*encoderPoolDelegate)(nil) type encoderPoolDelegate struct { encoding.SeriesEncoderPool } func (e *encoderPoolDelegate) Get(metadata []byte, buffer banyandb.Buffer) banyandb.SeriesEncoder { encoder := e.SeriesEncoderPool.Get(metadata, buffer) if encoder == nil { return nil } return &encoderDelegate{SeriesEncoder: encoder} } func (e *encoderPoolDelegate) Put(encoder banyandb.SeriesEncoder) { ee := encoder.(*encoderDelegate) e.SeriesEncoderPool.Put(ee.SeriesEncoder) } var _ banyandb.SeriesEncoder = (*encoderDelegate)(nil) type encoderDelegate struct { encoding.SeriesEncoder } func (e *encoderDelegate) Reset(key []byte, buffer banyandb.Buffer) { e.SeriesEncoder.Reset(key, &bufferDelegate{BufferWriter: buffer}) } var _ encoding.BufferWriter = (*bufferDelegate)(nil) type bufferDelegate struct { encoding.BufferWriter } var _ banyandb.SeriesDecoderPool = (*decoderPoolDelegate)(nil) type decoderPoolDelegate struct { encoding.SeriesDecoderPool } func (e *decoderPoolDelegate) Get(metadata []byte) banyandb.SeriesDecoder { if decoder := e.SeriesDecoderPool.Get(metadata); decoder != nil { return &decoderDelegate{ e.SeriesDecoderPool.Get(metadata), } } return nil } func (e *decoderPoolDelegate) Put(decoder banyandb.SeriesDecoder) { if decoder != nil { dd := decoder.(*decoderDelegate) e.SeriesDecoderPool.Put(dd.SeriesDecoder) } } var _ banyandb.SeriesDecoder = (*decoderDelegate)(nil) type decoderDelegate struct { encoding.SeriesDecoder } func (d *decoderDelegate) Iterator() banyandb.SeriesIterator { return &iterDelegate{ SeriesIterator: d.SeriesDecoder.Iterator(), } } type iterDelegate struct { encoding.SeriesIterator }