banyand/tsdb/tsdb.go (310 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package tsdb implements a time-series-based storage engine. // It provides: // - Partition data based on a time axis. // - Sharding data based on a series id which represents a unique entity of stream/measure // - Retrieving data based on index.Filter. // - Cleaning expired data, or the data retention. package tsdb import ( "context" "fmt" "io" "os" "strconv" "strings" "sync" "sync/atomic" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const ( shardPathPrefix = "shard" pathSeparator = string(os.PathSeparator) rootPrefix = "%s" + pathSeparator shardTemplate = rootPrefix + shardPathPrefix + "-%d" seriesTemplate = rootPrefix + "series" segPathPrefix = "seg" segTemplate = rootPrefix + segPathPrefix + "-%s" blockPathPrefix = "block" blockTemplate = rootPrefix + blockPathPrefix + "-%s" globalIndexTemplate = rootPrefix + "index" hourFormat = "2006010215" dayFormat = "20060102" dirPerm = 0o700 ) var ( // ErrUnknownShard indicates that the shard is not found. ErrUnknownShard = errors.New("unknown shard") errOpenDatabase = errors.New("fails to open the database") // OptionsKey is the key of options in context. OptionsKey = contextOptionsKey{} meterStorage = observability.RootScope.SubScope("storage") meterTSDB = meterStorage.SubScope("tsdb") ) type contextOptionsKey struct{} // Supplier allows getting a tsdb's runtime. type Supplier interface { SupplyTSDB() Database } // Database allows listing and getting shard details. type Database interface { io.Closer CreateShardsAndGetByID(id common.ShardID) (Shard, error) Shards() []Shard Shard(id common.ShardID) (Shard, error) } // Shard allows accessing data of tsdb. type Shard interface { io.Closer ID() common.ShardID Series() SeriesDatabase Index() IndexDatabase State() ShardState // Only works with MockClock TriggerSchedule(task string) bool } // IndexGranularity denotes the granularity of the local index. type IndexGranularity int // The options of the local index granularity. const ( IndexGranularityBlock IndexGranularity = iota IndexGranularitySeries ) var _ Database = (*database)(nil) // DatabaseOpts wraps options to create a tsdb. type DatabaseOpts struct { TSTableFactory TSTableFactory Location string SegmentInterval IntervalRule BlockInterval IntervalRule TTL IntervalRule BlockInvertedIndex InvertedIndexOpts SeriesMemSize run.Bytes GlobalIndexMemSize run.Bytes ShardNum uint32 EnableGlobalIndex bool IndexGranularity IndexGranularity } // InvertedIndexOpts wraps options to create the block inverted index. type InvertedIndexOpts struct { BatchWaitSec int64 } // EncodingMethod wraps encoder/decoder pools to flush/compact data on disk. type EncodingMethod struct { EncoderPool encoding.SeriesEncoderPool DecoderPool encoding.SeriesDecoderPool ChunkSizeInBytes int } // CompressionMethod denotes how to compress a single chunk. type CompressionMethod struct { Type CompressionType ChunkSizeInBytes int } // CompressionType specifies how a chunk should be compressed. type CompressionType int const ( // CompressionTypeNone mode indicates that a chunk is not compressed. CompressionTypeNone CompressionType = iota // CompressionTypeZSTD mode indicates that a chunk is compressed using CompressionTypeZSTD algorithm. CompressionTypeZSTD ) type ( // SectionID is the kind of a block/segment. SectionID uint32 // BlockID is the identity of a block in a shard. BlockID struct { SegID SectionID BlockID SectionID } ) func (b BlockID) String() string { return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), parseSuffix(b.BlockID)) } // GenerateInternalID returns a identity of a section(segment or block) based on IntervalRule. func GenerateInternalID(unit IntervalUnit, suffix int) SectionID { return SectionID(unit)<<31 | ((SectionID(suffix) << 1) >> 1) } func parseSuffix(id SectionID) int { return int((id << 1) >> 1) } func sectionIDToBytes(id SectionID) []byte { return convert.Uint32ToBytes(uint32(id)) } func readSectionID(data []byte, offset int) (SectionID, int) { end := offset + 4 return SectionID(convert.BytesToUint32(data[offset:end])), end } // BlockState is a sample of a block's runtime state. type BlockState struct { TimeRange timestamp.TimeRange ID BlockID Closed bool } // ShardState is a sample of a shard's runtime state. type ShardState struct { Blocks []BlockState OpenBlocks []BlockID StrategyManagers []string } type database struct { shardCreationCtx context.Context logger *logger.Logger location string sLst []Shard segmentSize IntervalRule blockSize IntervalRule ttl IntervalRule sync.RWMutex shardNum uint32 shardCreationState uint32 } func (d *database) CreateShardsAndGetByID(id common.ShardID) (Shard, error) { if atomic.LoadUint32(&d.shardCreationState) != 0 { return d.shard(id) } d.Lock() defer d.Unlock() if atomic.LoadUint32(&d.shardCreationState) != 0 { return d.shard(id) } loadedShardsNum := len(d.sLst) if loadedShardsNum < int(d.shardNum) { _, err := createDatabase(d, loadedShardsNum) if err != nil { return nil, errors.WithMessage(err, "create the database failed") } } atomic.StoreUint32(&d.shardCreationState, 1) return d.shard(id) } func (d *database) Shards() []Shard { d.RLock() defer d.RUnlock() return d.sLst } func (d *database) Shard(id common.ShardID) (Shard, error) { d.RLock() defer d.RUnlock() return d.shard(id) } func (d *database) shard(id common.ShardID) (Shard, error) { if uint(id) >= uint(len(d.sLst)) { return nil, ErrUnknownShard } return d.sLst[id], nil } func (d *database) Close() error { d.Lock() defer d.Unlock() var err error for _, s := range d.sLst { innerErr := s.Close() if innerErr != nil { err = multierr.Append(err, innerErr) } } return err } // OpenDatabase returns a new tsdb runtime. This constructor will create a new database if it's absent, // or load an existing one. func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { if _, err := mkdir(opts.Location); err != nil { return nil, err } if opts.SegmentInterval.Num == 0 { return nil, errors.Wrap(errOpenDatabase, "segment interval is absent") } if opts.BlockInterval.Num == 0 { return nil, errors.Wrap(errOpenDatabase, "block interval is absent") } if opts.BlockInterval.estimatedDuration() > opts.SegmentInterval.estimatedDuration() { return nil, errors.Wrapf(errOpenDatabase, "the block size is bigger than the segment size") } if opts.TTL.Num == 0 { return nil, errors.Wrap(errOpenDatabase, "ttl is absent") } p := common.GetPosition(ctx) db := &database{ location: opts.Location, shardNum: opts.ShardNum, logger: logger.Fetch(ctx, p.Database), segmentSize: opts.SegmentInterval, blockSize: opts.BlockInterval, ttl: opts.TTL, } db.logger.Info().Str("path", opts.Location).Msg("initialized") var entries []os.DirEntry var err error if entries, err = os.ReadDir(opts.Location); err != nil { return nil, errors.Wrap(err, "failed to read directory contents failed") } thisContext := context.WithValue(ctx, logger.ContextKey, db.logger) thisContext = context.WithValue(thisContext, OptionsKey, opts) db.shardCreationCtx = thisContext if len(entries) > 0 { return loadDatabase(thisContext, db) } return db, nil } func createDatabase(db *database, startID int) (Database, error) { var err error for i := startID; i < int(db.shardNum); i++ { db.logger.Info().Int("shard_id", i).Msg("creating a shard") so, errNewShard := OpenShard(db.shardCreationCtx, common.ShardID(i), db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue } db.sLst = append(db.sLst, so) } return db, err } func loadDatabase(ctx context.Context, db *database) (Database, error) { // TODO: open the lock file // TODO: open the manifest file db.Lock() defer db.Unlock() err := walkDir(db.location, shardPathPrefix, func(suffix string) error { shardID, err := strconv.Atoi(suffix) if err != nil { return err } if shardID >= int(db.shardNum) { return nil } db.logger.Info().Int("shard_id", shardID).Msg("opening a existing shard") so, errOpenShard := OpenShard( context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize, ) if errOpenShard != nil { return errOpenShard } db.sLst = append(db.sLst, so) return nil }) if err != nil { return nil, errors.WithMessage(err, "load the database failed") } return db, nil } type walkFn func(suffix string) error func walkDir(root, prefix string, wf walkFn) error { files, err := os.ReadDir(root) if err != nil { return errors.Wrapf(err, "failed to walk the database path: %s", root) } for _, f := range files { if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) { continue } segs := strings.Split(f.Name(), "-") errWalk := wf(segs[len(segs)-1]) if errWalk != nil { return errors.WithMessagef(errWalk, "failed to load: %s", f.Name()) } } return nil } func mkdir(format string, a ...interface{}) (path string, err error) { path = fmt.Sprintf(format, a...) if err = os.MkdirAll(path, dirPerm); err != nil { return "", errors.Wrapf(err, "failed to create %s", path) } return path, err } func mkdirIfNotExist(format string, a ...interface{}) (err error) { path := fmt.Sprintf(format, a...) if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) { if err = os.MkdirAll(path, dirPerm); err != nil { return errors.Wrapf(err, "failed to create %s", path) } } else { return os.ErrExist } return err }