banyand/tsdb/buffer.go (235 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"fmt"
"sync"
"time"
"unsafe"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)
const (
defaultSize = 1 << 20 // 1MB
nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
)
var (
bufferMeterProvider meter.Provider
maxBytes meter.Gauge
mutableBytes meter.Gauge
)
func init() {
bufferMeterProvider = observability.NewMeterProvider(meterTSDB.SubScope("buffer"))
labelNames := append(common.LabelNames(), "bucket")
maxBytes = bufferMeterProvider.Gauge("max_bytes", labelNames...)
mutableBytes = bufferMeterProvider.Gauge("mutable_bytes", labelNames...)
}
type operation struct {
key []byte
value []byte
epoch uint64
}
type flushEvent struct {
data *skl.Skiplist
}
type onFlush func(shardIndex int, skl *skl.Skiplist) error
type bufferShardBucket struct {
mutable *skl.Skiplist
writeCh chan operation
flushCh chan flushEvent
writeWaitGroup *sync.WaitGroup
flushWaitGroup *sync.WaitGroup
log *logger.Logger
immutables []*skl.Skiplist
labelValues []string
shardLabelValues []string
index int
capacity int
mutex sync.RWMutex
}
// Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
type Buffer struct {
onFlushFn onFlush
entryCloser *run.Closer
log *logger.Logger
buckets []bufferShardBucket
writeWaitGroup sync.WaitGroup
flushWaitGroup sync.WaitGroup
numShards int
closerOnce sync.Once
}
// NewBuffer creates a new Buffer instance with the given parameters.
func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
buckets := make([]bufferShardBucket, numShards)
buffer := &Buffer{
buckets: buckets,
numShards: numShards,
onFlushFn: onFlushFn,
entryCloser: run.NewCloser(1),
log: log.Named("buffer"),
}
buffer.writeWaitGroup.Add(numShards)
buffer.flushWaitGroup.Add(numShards)
for i := 0; i < numShards; i++ {
buckets[i] = bufferShardBucket{
index: i,
capacity: flushSize,
mutable: skl.NewSkiplist(int64(flushSize)),
writeCh: make(chan operation, writeConcurrency),
flushCh: make(chan flushEvent, 1),
writeWaitGroup: &buffer.writeWaitGroup,
flushWaitGroup: &buffer.flushWaitGroup,
log: buffer.log.Named(fmt.Sprintf("shard-%d", i)),
labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)),
shardLabelValues: position.ShardLabelValues(),
}
buckets[i].start(onFlushFn)
maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
}
return buffer, nil
}
// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
if b == nil || !b.entryCloser.AddRunning() {
return
}
defer b.entryCloser.Done()
index := b.getShardIndex(key)
if b.log.Debug().Enabled() {
b.log.Debug().Uint64("shard", index).Bytes("key", key).
Time("ts", timestamp).Msg("route a shard")
}
b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())}
}
// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
if b == nil || !b.entryCloser.AddRunning() {
return nil, false
}
defer b.entryCloser.Done()
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
ll, deferFn := b.buckets[index].getAll()
defer deferFn()
for _, bk := range ll {
value := bk.Get(keyWithTS)
if value.Meta == 0 && value.Value == nil {
continue
}
if value.Version == epoch {
return value.Value, true
}
}
return nil, false
}
// Close gracefully closes the Buffer and ensures that all pending operations are completed.
func (b *Buffer) Close() error {
if b == nil {
return nil
}
b.closerOnce.Do(func() {
b.entryCloser.Done()
b.entryCloser.CloseThenWait()
for i := 0; i < b.numShards; i++ {
close(b.buckets[i].writeCh)
}
b.writeWaitGroup.Wait()
for i := 0; i < b.numShards; i++ {
if err := b.onFlushFn(i, b.buckets[i].mutable); err != nil {
b.buckets[i].log.Err(err).Msg("flushing mutable buffer failed")
}
b.buckets[i].mutable.DecrRef()
}
for i := 0; i < b.numShards; i++ {
close(b.buckets[i].flushCh)
}
b.flushWaitGroup.Wait()
})
return nil
}
func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}
func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
bsb.mutex.RLock()
defer bsb.mutex.RUnlock()
allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
bsb.mutable.IncrRef()
allList[0] = bsb.mutable
last := len(bsb.immutables) - 1
for i := range bsb.immutables {
allList[i+1] = bsb.immutables[last-i]
bsb.immutables[last-i].IncrRef()
}
return allList, func() {
for _, l := range allList {
l.DecrRef()
}
}
}
func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
go func() {
defer func() {
for _, g := range []meter.Gauge{maxBytes, mutableBytes} {
g.Delete(bsb.labelValues...)
}
}()
defer bsb.flushWaitGroup.Done()
for event := range bsb.flushCh {
oldSkipList := event.data
memSize := oldSkipList.MemSize()
t1 := time.Now()
for {
if err := onFlushFn(bsb.index, oldSkipList); err != nil {
bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
time.Sleep(time.Second)
continue
}
break
}
flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)
bsb.mutex.Lock()
if len(bsb.immutables) > 0 {
bsb.immutables = bsb.immutables[1:]
}
bsb.mutex.Unlock()
oldSkipList.DecrRef()
}
}()
go func() {
defer bsb.writeWaitGroup.Done()
volume := 0
for op := range bsb.writeCh {
k := y.KeyWithTs(op.key, op.epoch)
v := y.ValueStruct{Value: op.value}
volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign
memSize := bsb.mutable.MemSize()
mutableBytes.Set(float64(memSize), bsb.labelValues...)
if volume >= bsb.capacity || memSize >= int64(bsb.capacity) {
bsb.triggerFlushing()
volume = 0
}
bsb.mutable.Put(k, v)
}
}()
}
func (bsb *bufferShardBucket) triggerFlushing() {
for {
select {
case bsb.flushCh <- flushEvent{data: bsb.mutable}:
bsb.mutex.Lock()
defer bsb.mutex.Unlock()
bsb.swap()
return
default:
}
time.Sleep(10 * time.Second)
}
}
func (bsb *bufferShardBucket) swap() {
bsb.immutables = append(bsb.immutables, bsb.mutable)
bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
}