banyand/kv/kv.go (221 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package kv implements a key-value engine.
package kv
import (
"fmt"
"io"
"math"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var (
errStopScan = errors.New("stop scanning")
// DefaultScanOpts is a helper to provides canonical options for scanning.
DefaultScanOpts = ScanOpts{
PrefetchSize: 100,
PrefetchValues: true,
}
defaultKVMemorySize = 4 << 20
)
type writer interface {
// Put a value
Put(key, val []byte) error
PutWithVersion(key, val []byte, version uint64) error
}
// ScanFunc is the closure executed on scanning out a pair of key-value.
type ScanFunc func(key []byte, getVal func() ([]byte, error)) error
// ScanOpts wraps options for scanning the kv storage.
type ScanOpts struct {
Prefix []byte
PrefetchSize int
PrefetchValues bool
Reverse bool
}
// Reader allows retrieving data from kv.
type Reader interface {
Iterable
// Get a value by its key
Get(key []byte) ([]byte, error)
GetAll(key []byte, applyFn func([]byte) error) error
Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error
}
// Store is a common kv storage with auto-generated key.
type Store interface {
io.Closer
writer
Reader
SizeOnDisk() int64
}
// TimeSeriesReader allows retrieving data from a time-series storage.
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
Get(key []byte, ts uint64) ([]byte, error)
}
// TimeSeriesStore is time series storage.
type TimeSeriesStore interface {
io.Closer
Handover(skl *skl.Skiplist) error
TimeSeriesReader
SizeOnDisk() int64
CollectStats() *badger.Statistics
}
// TimeSeriesOptions sets an options for creating a TimeSeriesStore.
type TimeSeriesOptions func(TimeSeriesStore)
// TSSWithLogger sets a external logger into underlying TimeSeriesStore.
func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
btss.dbOpts = btss.dbOpts.WithLogger(&badgerLog{
delegated: l.Named("ts-kv"),
})
}
}
}
// TSSWithEncoding sets encoding and decoding pools for building chunks.
func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool, chunkSize int) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
btss.dbOpts = btss.dbOpts.
WithKeyBasedEncoder(
&encoderPoolDelegate{
encoderPool,
}, &decoderPoolDelegate{
decoderPool,
}, chunkSize).
WithSameKeyBlock()
}
}
}
// TSSWithZSTDCompression sets a ZSTD based compression method.
func TSSWithZSTDCompression(chunkSize int) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
btss.dbOpts = btss.dbOpts.
WithCompression(options.ZSTD).
WithBlockSize(chunkSize).
WithZSTDCompressionLevel(3)
}
}
}
// TSSWithMemTableSize sets the size of memory table in bytes.
func TSSWithMemTableSize(sizeInBytes int64) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if sizeInBytes < 1 {
return
}
if btss, ok := store.(*badgerTSS); ok {
btss.dbOpts.MemTableSize = sizeInBytes
}
}
}
// Iterator allows iterating the kv tables.
// TODO: use generic to provide a unique iterator.
type Iterator interface {
Next()
Rewind()
Seek(key []byte)
Key() []byte
RawKey() []byte
Val() []byte
Valid() bool
Close() error
}
// Iterable allows creating a Iterator.
type Iterable interface {
NewIterator(opt ScanOpts) Iterator
}
// IndexStore allows writing and reading index format data.
type IndexStore interface {
Iterable
Reader
Close() error
SizeOnDisk() int64
}
// OpenTimeSeriesStore creates a new TimeSeriesStore.
// nolint: contextcheck
func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
btss := new(badgerTSS)
btss.dbOpts = badger.DefaultOptions(path)
for _, opt := range options {
opt(btss)
}
// Put all values into LSM
btss.dbOpts = btss.dbOpts.
WithNumVersionsToKeep(math.MaxUint32).
WithVLogPercentile(1.0).
WithInTable().
WithMaxLevels(2).
WithBaseTableSize(10 << 20).
WithBaseLevelSize(math.MaxInt64)
if btss.dbOpts.MemTableSize < int64(defaultKVMemorySize) {
btss.dbOpts.MemTableSize = int64(defaultKVMemorySize)
}
if btss.dbOpts.MemTableSize < 8<<20 {
btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
}
btss.dbOpts.LmaxCompaction = true
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
return nil, fmt.Errorf("failed to open time series store: %w", err)
}
btss.TSet = *badger.NewTSet(btss.db)
return btss, nil
}
// StoreOptions sets options for creating Store.
type StoreOptions func(Store)
// StoreWithLogger sets a external logger into underlying Store.
func StoreWithLogger(l *logger.Logger) StoreOptions {
return StoreWithNamedLogger("normal-kv", l)
}
// StoreWithNamedLogger sets a external logger with a name into underlying Store.
func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions {
return func(store Store) {
if bdb, ok := store.(*badgerDB); ok {
bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
delegated: l.Named(name),
})
}
}
}
// StoreWithMemTableSize sets MemTable size.
func StoreWithMemTableSize(size int64) StoreOptions {
return func(store Store) {
if size < 1 {
return
}
if bdb, ok := store.(*badgerDB); ok {
bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
}
}
}
// OpenStore creates a new Store.
// nolint: contextcheck
func OpenStore(path string, opts ...StoreOptions) (Store, error) {
bdb := new(badgerDB)
bdb.dbOpts = badger.DefaultOptions(path)
for _, opt := range opts {
opt(bdb)
}
if bdb.dbOpts.MemTableSize < int64(defaultKVMemorySize) {
bdb.dbOpts.MemTableSize = int64(defaultKVMemorySize)
}
if bdb.dbOpts.MemTableSize < 8<<20 {
bdb.dbOpts = bdb.dbOpts.WithValueThreshold(1 << 10)
}
bdb.dbOpts = bdb.dbOpts.
WithBaseTableSize(5 << 20).
WithBaseLevelSize(25 << 20).
WithCompression(options.ZSTD).
WithZSTDCompressionLevel(1)
var err error
bdb.db, err = badger.Open(bdb.dbOpts)
if err != nil {
return nil, fmt.Errorf("failed to open normal store: %w", err)
}
return bdb, nil
}
// IndexOptions sets options for creating the index store.
type IndexOptions func(store IndexStore)
// IndexWithLogger sets a external logger into underlying IndexStore.
func IndexWithLogger(l *logger.Logger) IndexOptions {
return func(store IndexStore) {
if bdb, ok := store.(*badgerDB); ok {
bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
delegated: l.Named("index-kv"),
})
}
}
}
// OpenIndexStore creates a new IndexStore.
func OpenIndexStore(path string, options ...IndexOptions) (IndexStore, error) {
bdb := new(badgerDB)
bdb.dbOpts = badger.DefaultOptions(path)
for _, opt := range options {
opt(bdb)
}
bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32).
WithNumCompactors(2).
WithMemTableSize(2 << 20).
WithMaxLevels(2).
WithBaseTableSize(2 << 20).
WithBaseLevelSize(math.MaxInt64).
WithValueThreshold(1 << 10)
var err error
bdb.db, err = badger.Open(bdb.dbOpts)
if err != nil {
return nil, fmt.Errorf("failed to index store: %w", err)
}
return bdb, nil
}