banyand/tsdb/block.go (416 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package tsdb import ( "context" "fmt" "io" "os" "path" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" "unsafe" "github.com/dgraph-io/badger/v3" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/lsm" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const ( componentSecondInvertedIdx = "inverted" componentSecondLSMIdx = "lsm" defaultEnqueueTimeout = 500 * time.Millisecond itemIDLength = unsafe.Sizeof(common.ItemID(0)) ) var ( blockMeterProvider = observability.NewMeterProvider(meterTSDB.SubScope("block")) blockOpenedTimeSecondsGauge = blockMeterProvider.Gauge("opened_time_seconds", common.LabelNames()...) blockReferencesGauge = blockMeterProvider.Gauge("refs", common.LabelNames()...) blockCompressedSize = blockMeterProvider.Gauge("compressed_size", append(common.LabelNames(), "from", "to")...) blockUncompressedSize = blockMeterProvider.Gauge("uncompressed_size", append(common.LabelNames(), "from", "to")...) blockCompressedBlockNum = blockMeterProvider.Gauge("compressed_block_num", append(common.LabelNames(), "from", "to")...) blockCompressedEntryNum = blockMeterProvider.Gauge("compressed_entry_num", append(common.LabelNames(), "from", "to")...) blockEncodedSize = blockMeterProvider.Gauge("encoded_size", append(common.LabelNames(), "from", "to")...) blockUnencodedSize = blockMeterProvider.Gauge("unencoded_size", append(common.LabelNames(), "from", "to")...) blockEncodedBlockNum = blockMeterProvider.Gauge("encoded_block_num", append(common.LabelNames(), "from", "to")...) blockEncodedEntryNum = blockMeterProvider.Gauge("encoded_entry_num", append(common.LabelNames(), "from", "to")...) blockCompactionMap = map[badger.TableBuilderSizeKeyType]meter.Gauge{ badger.TableBuilderSizeKeyCompressedSize: blockCompressedSize, badger.TableBuilderSizeKeyUncompressedSize: blockUncompressedSize, badger.TableBuilderSizeKeyCompressedBlockNum: blockCompressedBlockNum, badger.TableBuilderSizeKeyCompressedEntryNum: blockCompressedEntryNum, badger.TableBuilderSizeKeyEncodedSize: blockEncodedSize, badger.TableBuilderSizeKeyUnEncodedSize: blockUnencodedSize, badger.TableBuilderSizeKeyEncodedBlockNum: blockEncodedBlockNum, badger.TableBuilderSizeKeyEncodedEntryNum: blockEncodedEntryNum, } errBlockClosingInterrupted = errors.New("interrupt to close the block") ) type block struct { openOpts openOpts invertedIndex index.Store tsTable TSTable queue bucket.Queue bucket.Reporter clock timestamp.Clock lsmIndex index.Store closeBufferTimer *time.Timer closed *atomic.Bool ref *atomic.Int32 l *logger.Logger deleted *atomic.Bool position common.Position timestamp.TimeRange segSuffix string suffix string path string closableLst []io.Closer lock sync.RWMutex segID SectionID blockID SectionID } type openOpts struct { tsTableFactory TSTableFactory inverted *inverted.StoreOpts lsm lsm.StoreOpts } type blockOpts struct { queue bucket.Queue scheduler *timestamp.Scheduler timeRange timestamp.TimeRange segSuffix string suffix string path string blockSize IntervalRule segID SectionID } func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { suffixInteger, err := strconv.Atoi(opts.suffix) if err != nil { return nil, err } id := GenerateInternalID(opts.blockSize.Unit, suffixInteger) clock, _ := timestamp.GetClock(ctx) b = &block{ segID: opts.segID, segSuffix: opts.segSuffix, suffix: opts.suffix, blockID: id, path: opts.path, TimeRange: opts.timeRange, clock: clock, ref: &atomic.Int32{}, closed: &atomic.Bool{}, deleted: &atomic.Bool{}, queue: opts.queue, position: common.GetPosition(ctx), } l := logger.Fetch(ctx, b.String()) b.openOpts, err = options(ctx, opts.path, l) if err != nil { return nil, err } b.l = l b.Reporter = bucket.NewTimeBasedReporter(b.String(), opts.timeRange, clock, opts.scheduler) b.closed.Store(true) return b, nil } func options(ctx context.Context, root string, l *logger.Logger) (openOpts, error) { var options DatabaseOpts o := ctx.Value(OptionsKey) if o == nil { return openOpts{}, errors.New("database options not found") } options = o.(DatabaseOpts) var opts openOpts if options.IndexGranularity == IndexGranularityBlock { opts.inverted = &inverted.StoreOpts{ Path: path.Join(root, componentSecondInvertedIdx), Logger: l.Named(componentSecondInvertedIdx), BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, } } opts.lsm = lsm.StoreOpts{ Path: path.Join(root, componentSecondLSMIdx), Logger: l.Named(componentSecondLSMIdx), MemTableSize: defaultKVMemorySize, } opts.tsTableFactory = options.TSTableFactory if opts.tsTableFactory == nil { return opts, errors.New("ts table factory is nil") } return opts, nil } func (b *block) openSafely() (err error) { if b.deleted.Load() || !b.Closed() { return nil } b.lock.Lock() defer b.lock.Unlock() if !b.Closed() { return } return b.open() } func (b *block) open() (err error) { if b.tsTable, err = b.openOpts.tsTableFactory.NewTSTable(BlockExpiryTracker{ttl: b.End, clock: b.clock}, b.path, b.position, b.l); err != nil { return err } b.closableLst = append(b.closableLst, b.tsTable) if b.openOpts.inverted != nil { if b.invertedIndex, err = inverted.NewStore(*b.openOpts.inverted); err != nil { return err } b.closableLst = append(b.closableLst, b.invertedIndex) } if b.lsmIndex, err = lsm.NewStore(b.openOpts.lsm); err != nil { return err } b.closableLst = append(b.closableLst, b.lsmIndex) b.ref.Store(0) b.closed.Store(false) blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...) plv := b.position.LabelValues() observability.MetricsCollector.Register(strings.Join(plv, "-"), func() { stats := b.tsTable.CollectStats() stats.TableBuilderSize.Range(func(label interface{}, value interface{}) bool { key := label.(badger.TableBuilderSizeKey) counter := value.(*atomic.Int64) from, to := getLevelLabels(key.FromLevel, key.ToLevel) labelValues := append(b.position.LabelValues(), from, to) if block, ok := blockCompactionMap[key.Type]; ok { block.Set(float64(counter.Load()), labelValues...) } return true }) }) return nil } func (b *block) delegate(ctx context.Context) (blockDelegate, error) { if b.deleted.Load() { return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b) } blockID := BlockID{ BlockID: b.blockID, SegID: b.segID, } if b.incRef() { b.queue.Touch(blockID) return &bDelegate{ delegate: b, }, nil } b.lock.Lock() defer b.lock.Unlock() if err := b.queue.Push(ctx, blockID, func() error { if b.deleted.Load() || !b.Closed() { return nil } return b.open() }); err != nil { b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block") return nil, err } b.incRef() return &bDelegate{ delegate: b, }, nil } func (b *block) incRef() bool { if b.Closed() { return false } b.ref.Add(1) blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...) return true } func (b *block) Done() { b.ref.Add(-1) blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...) } func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} { ch := make(chan struct{}) go func() { loop: if b.ref.Load() < 1 { b.ref.Store(0) close(ch) return } if stopped.Load() { close(ch) return } runtime.Gosched() goto loop }() return ch } func (b *block) close(ctx context.Context) (err error) { b.lock.Lock() defer b.lock.Unlock() if b.closed.Load() { return nil } stopWaiting := &atomic.Bool{} ch := b.waitDone(stopWaiting) select { case <-ctx.Done(): stopWaiting.Store(true) return errors.Wrapf(errBlockClosingInterrupted, "block:%s", b) case <-ch: } b.closed.Store(true) if b.closeBufferTimer != nil { b.closeBufferTimer.Stop() } plv := b.position.LabelValues() observability.MetricsCollector.Unregister(strings.Join(plv, "-")) stats := b.tsTable.CollectStats() stats.TableBuilderSize.Range(func(label interface{}, value interface{}) bool { key := label.(badger.TableBuilderSizeKey) from, to := getLevelLabels(key.FromLevel, key.ToLevel) labelValues := append(b.position.LabelValues(), from, to) if block, ok := blockCompactionMap[key.Type]; ok { block.Delete(labelValues...) } return true }) for _, closer := range b.closableLst { err = multierr.Append(err, closer.Close()) } for _, g := range []meter.Gauge{blockOpenedTimeSecondsGauge, blockReferencesGauge} { g.Delete(b.position.LabelValues()...) } return err } func (b *block) delete(ctx context.Context) error { if b.deleted.Load() { return nil } b.deleted.Store(true) b.close(ctx) return os.RemoveAll(b.path) } func (b *block) Closed() bool { return b.closed.Load() } func (b *block) String() string { return fmt.Sprintf("BlockID-%s-%s", b.segSuffix, b.suffix) } func (b *block) Get(key []byte, ts uint64) ([]byte, error) { return b.tsTable.Get(key, time.Unix(0, int64(ts))) } type blockDelegate interface { io.Closer contains(ts time.Time) bool write(key []byte, val []byte, ts time.Time) error writePrimaryIndex(field index.Field, id common.ItemID) error writeLSMIndex(fields []index.Field, id common.ItemID) error writeInvertedIndex(fields []index.Field, id common.ItemID) error dataReader() kv.TimeSeriesReader lsmIndexReader() index.Searcher invertedIndexReader() index.Searcher primaryIndexReader() index.FieldIterable identity() (segID SectionID, blockID SectionID) startTime() time.Time String() string } var _ blockDelegate = (*bDelegate)(nil) type bDelegate struct { delegate *block } func (d *bDelegate) dataReader() kv.TimeSeriesReader { return d.delegate } func (d *bDelegate) lsmIndexReader() index.Searcher { return d.delegate.lsmIndex } func (d *bDelegate) invertedIndexReader() index.Searcher { return d.delegate.invertedIndex } func (d *bDelegate) primaryIndexReader() index.FieldIterable { return d.delegate.lsmIndex } func (d *bDelegate) startTime() time.Time { return d.delegate.Start } func (d *bDelegate) identity() (segID SectionID, blockID SectionID) { return d.delegate.segID, d.delegate.blockID } func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error { if err := d.delegate.tsTable.Put(key, val, ts); err != nil { receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "tst", "true")...) return err } receivedBytesCounter.Inc(float64(len(key)+len(val)), append(d.delegate.position.ShardLabelValues(), "tst")...) receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "tst", "false")...) return nil } func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error { if err := d.delegate.lsmIndex.Write([]index.Field{field}, uint64(id)); err != nil { receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "true")...) return err } receivedBytesCounter.Inc(float64(len(field.Marshal())+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "primary")...) receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "false")...) return nil } func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error { total := 0 for _, f := range fields { total += len(f.Marshal()) } if err := d.delegate.lsmIndex.Write(fields, uint64(id)); err != nil { receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "true")...) return err } receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_lsm")...) receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "false")...) return nil } func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error { if d.delegate.invertedIndex == nil { return errors.New("inverted index is not enabled") } total := 0 for _, f := range fields { total += len(f.Marshal()) } if err := d.delegate.invertedIndex.Write(fields, uint64(id)); err != nil { receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "true")...) return err } receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_inverted")...) receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "false")...) return nil } func (d *bDelegate) contains(ts time.Time) bool { return d.delegate.Contains(uint64(ts.UnixNano())) } func (d *bDelegate) String() string { return d.delegate.String() } func (d *bDelegate) Close() error { d.delegate.Done() return nil } func getLevelLabels(fromLevel, toLevel int) (string, string) { from := fmt.Sprintf("l%d", fromLevel) to := fmt.Sprintf("l%d", toLevel) if fromLevel < 0 { from = "mem" } return from, to }