banyand/internal/storage/tsdb.go (221 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package storage import ( "context" "path/filepath" "strings" "sync" "sync/atomic" "time" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) // IndexGranularity denotes the granularity of the local index. type IndexGranularity int // The options of the local index granularity. const ( IndexGranularityBlock IndexGranularity = iota IndexGranularitySeries ) const ( lockFilename = "lock" ) // TSDBOpts wraps options to create a tsdb. type TSDBOpts[T TSTable, O any] struct { Option O TableMetrics Metrics TSTableCreator TSTableCreator[T, O] StorageMetricsFactory *observability.Factory Location string SegmentInterval IntervalRule TTL IntervalRule SeriesIndexFlushTimeoutSeconds int64 SeriesIndexCacheMaxBytes int ShardNum uint32 DisableRetention bool SegmentIdleTimeout time.Duration } type ( segmentID uint32 ) func generateSegID(unit IntervalUnit, suffix int) segmentID { return segmentID(unit)<<31 | ((segmentID(suffix) << 1) >> 1) } type database[T TSTable, O any] struct { lock fs.File logger *logger.Logger scheduler *timestamp.Scheduler tsEventCh chan int64 segmentController *segmentController[T, O] *metrics p common.Position location string latestTickTime atomic.Int64 sync.RWMutex rotationProcessOn atomic.Bool closed atomic.Bool disableRetention bool } func (d *database[T, O]) Close() error { if d.closed.Load() { return nil } d.closed.Store(true) d.Lock() defer d.Unlock() d.scheduler.Close() close(d.tsEventCh) d.segmentController.close() d.lock.Close() if err := lfs.DeleteFile(d.lock.Path()); err != nil { logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), err) } return nil } // OpenTSDB returns a new tsdb runtime. This constructor will create a new database if it's absent, // or load an existing one. func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[T, O], error) { if opts.SegmentInterval.Num == 0 { return nil, errors.Wrap(errOpenDatabase, "segment interval is absent") } if opts.TTL.Num == 0 { return nil, errors.Wrap(errOpenDatabase, "ttl is absent") } p := common.GetPosition(ctx) location := filepath.Clean(opts.Location) lfs.MkdirIfNotExist(location, DirPerm) l := logger.Fetch(ctx, p.Database) clock, _ := timestamp.GetClock(ctx) scheduler := timestamp.NewScheduler(l, clock) var indexMetrics *inverted.Metrics if opts.StorageMetricsFactory != nil { indexMetrics = inverted.NewMetrics(opts.StorageMetricsFactory, common.SegLabelNames()...) } db := &database[T, O]{ location: location, scheduler: scheduler, logger: l, tsEventCh: make(chan int64), p: p, segmentController: newSegmentController(ctx, location, l, opts, indexMetrics, opts.TableMetrics, opts.SegmentIdleTimeout), metrics: newMetrics(opts.StorageMetricsFactory), disableRetention: opts.DisableRetention, } db.logger.Info().Str("path", opts.Location).Msg("initialized") lockPath := filepath.Join(opts.Location, lockFilename) lock, err := lfs.CreateLockFile(lockPath, FilePerm) if err != nil { logger.Panicf("cannot create lock file %s: %s", lockPath, err) } db.lock = lock if err := db.segmentController.open(); err != nil { return nil, err } observability.MetricsCollector.Register(location, db.collect) return db, db.startRotationTask() } func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) { if d.closed.Load() { return nil, errors.New("database is closed") } return d.segmentController.createSegment(ts) } func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error) { if d.closed.Load() { return nil, nil } return d.segmentController.selectSegments(timeRange) } func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) { if d.closed.Load() { return } d.segmentController.updateOptions(resourceOpts) } func (d *database[T, O]) TakeFileSnapshot(dst string) error { if d.closed.Load() { return errors.New("database is closed") } segments, err := d.segmentController.segments(true) if err != nil { return errors.Wrap(err, "failed to get segments") } defer func() { for _, seg := range segments { seg.DecRef() } }() for _, seg := range segments { segDir := filepath.Base(seg.location) segPath := filepath.Join(dst, segDir) lfs.MkdirIfNotExist(segPath, DirPerm) metadataSrc := filepath.Join(seg.location, metadataFilename) metadataDest := filepath.Join(segPath, metadataFilename) if err := lfs.CreateHardLink(metadataSrc, metadataDest, nil); err != nil { return errors.Wrapf(err, "failed to snapshot metadata for segment %s", segDir) } indexPath := filepath.Join(segPath, seriesIndexDirName) lfs.MkdirIfNotExist(indexPath, DirPerm) if err := seg.index.store.TakeFileSnapshot(indexPath); err != nil { return errors.Wrapf(err, "failed to snapshot index for segment %s", segDir) } sLst := seg.sLst.Load() if sLst == nil { continue } for _, shard := range *sLst { shardDir := filepath.Base(shard.location) shardPath := filepath.Join(segPath, shardDir) lfs.MkdirIfNotExist(shardPath, DirPerm) if err := shard.table.TakeFileSnapshot(shardPath); err != nil { return errors.Wrapf(err, "failed to snapshot shard %s in segment %s", shardDir, segDir) } } } return nil } func (d *database[T, O]) GetExpiredSegmentsTimeRange() *timestamp.TimeRange { return d.segmentController.getExpiredSegmentsTimeRange() } func (d *database[T, O]) DeleteExpiredSegments(timeRange timestamp.TimeRange) int64 { return d.segmentController.deleteExpiredSegments(timeRange) } func (d *database[T, O]) collect() { if d.closed.Load() { return } if d.metrics == nil { return } d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load())) refCount := int32(0) ss, _ := d.segmentController.segments(false) for _, s := range ss { for _, t := range s.Tables() { t.Collect(d.segmentController.metrics) } s.index.store.CollectMetrics(s.index.p.SegLabelValues()...) s.DecRef() refCount += atomic.LoadInt32(&s.refCount) } d.totalSegRefs.Set(float64(refCount)) metrics := d.scheduler.Metrics() for job, m := range metrics { d.metrics.schedulerMetrics.Collect(job, m) } } type walkFn func(suffix string) error func walkDir(root, prefix string, wf walkFn) error { for _, f := range lfs.ReadDir(root) { if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) { continue } segs := strings.Split(f.Name(), "-") errWalk := wf(segs[len(segs)-1]) if errWalk != nil { return errors.WithMessagef(errWalk, "failed to load: %s", f.Name()) } } return nil }