banyand/tsdb/seriesdb.go (659 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package tsdb import ( "bytes" "context" "errors" "io" "math" "path" "sort" "strings" "sync" "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/lsm" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( entityPrefix = []byte("entity_") entityPrefixLen = len(entityPrefix) seriesPrefix = []byte("series_") maxIntBytes = convert.Uint64ToBytes(math.MaxUint64) zeroIntBytes = convert.Uint64ToBytes(0) ) // AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity. var AnyEntry = Entry(nil) // Entry is an element in an Entity. type Entry []byte // Entity denotes an identity of a Series. // It defined by Stream or Measure schema. type Entity []Entry // Marshal encodes an Entity to bytes. func (e Entity) Marshal() []byte { data := make([][]byte, len(e)) for i, entry := range e { data[i] = entry } return bytes.Join(data, nil) } // Prepend inserts an Entry before the first Entry as the prefix. func (e Entity) Prepend(entry Entry) Entity { d := e d = append(Entity{entry}, d...) return d } // Copy an Entity deeply. func (e Entity) Copy() Entity { a := make(Entity, len(e)) copy(a, e) return a } // NewEntity return an Entity with an fixed length. func NewEntity(length int) Entity { e := make(Entity, length) for i := 0; i < length; i++ { e[i] = AnyEntry } return e } // EntityValue represents the value of a tag which is a part of an entity. type EntityValue = *modelv1.TagValue // EntityValueToEntry transforms EntityValue to Entry. func EntityValueToEntry(ev EntityValue) (Entry, error) { return pbv1.MarshalTagValue(ev) } // EntityValues is the encoded Entity. type EntityValues []EntityValue // Prepend inserts an EntityValue before the first EntityValue as the prefix. func (evs EntityValues) Prepend(scope EntityValue) EntityValues { return append(EntityValues{scope}, evs...) } // Encode EntityValues to tag values. func (evs EntityValues) Encode() (result []*modelv1.TagValue) { for _, v := range evs { result = append(result, v) } return } // ToEntity transforms EntityValues to Entity. func (evs EntityValues) ToEntity() (result Entity, err error) { for _, v := range evs { entry, errMarshal := EntityValueToEntry(v) if errMarshal != nil { return nil, err } result = append(result, entry) } return } // String outputs the string represent of an EntityValue. func (evs EntityValues) String() string { var strBuilder strings.Builder vv := evs.Encode() for i := 0; i < len(vv); i++ { strBuilder.WriteString(vv[i].String()) if i < len(vv)-1 { strBuilder.WriteString(".") } } return strBuilder.String() } // DecodeEntityValues decodes tag values to EntityValues. func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues) { for _, tv := range tvv { result = append(result, tv) } return } // StrValue returns an EntityValue which wraps a string value. func StrValue(v string) EntityValue { return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: v}}} } // Int64Value returns an EntityValue which wraps a int64 value. func Int64Value(v int64) EntityValue { return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: v}}} } // MarshalEntityValues encodes EntityValues to bytes. func MarshalEntityValues(evs EntityValues) ([]byte, error) { data := &modelv1.TagFamilyForWrite{} for _, v := range evs { data.Tags = append(data.Tags, v) } return proto.Marshal(data) } // UnmarshalEntityValues decodes EntityValues from bytes. func UnmarshalEntityValues(evs []byte) (result EntityValues, err error) { data := &modelv1.TagFamilyForWrite{} result = make(EntityValues, len(data.Tags)) if err = proto.Unmarshal(evs, data); err != nil { return nil, err } for _, tv := range data.Tags { result = append(result, tv) } return } // Path denotes a expression to match a Series. // It supports the fuzzy matching more than EQ by setting an entry to AnyEntry. type Path struct { prefix []byte seekKey []byte mask []byte template []byte isFull bool offset int } // NewPath return a Path with a matching expression. func NewPath(matchingExpression []Entry) Path { p := Path{ seekKey: make([]byte, 0), mask: make([]byte, 0), template: make([]byte, 0), } var encounterAny bool for _, e := range matchingExpression { if e == nil { encounterAny = true p.mask = append(p.mask, zeroIntBytes...) p.template = append(p.template, zeroIntBytes...) continue } entry := Hash(e) if !encounterAny { p.offset += 8 } p.mask = append(p.mask, maxIntBytes...) p.template = append(p.template, entry...) } if !encounterAny { p.isFull = true } p.extractPrefix() return p } func (p *Path) extractPrefix() { p.prefix = p.template[:p.offset] p.seekKey = p.seekKey[:0] p.seekKey = append(p.seekKey, p.prefix...) for i := 0; i < len(p.template)-p.offset; i++ { p.seekKey = append(p.seekKey, 0) } } func (p Path) prepend(entry Entry) Path { e := Hash(entry) p.template = prepend(p.template, e) p.offset += len(e) p.extractPrefix() p.mask = prepend(p.mask, maxIntBytes) return p } func prepend(src []byte, entry []byte) []byte { dst := make([]byte, len(src)+len(entry)) copy(dst, entry) copy(dst[len(entry):], src) return dst } // OrderBy specifies the order of the result. type OrderBy struct { Index *databasev1.IndexRule Sort modelv1.Sort } // SeriesDatabase allows retrieving series. type SeriesDatabase interface { io.Closer GetByID(id common.SeriesID) (Series, error) Get(key []byte, entityValues EntityValues) (Series, error) List(ctx context.Context, path Path) (SeriesList, error) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) SizeOnDisk() int64 writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error } type blockDatabase interface { shardID() common.ShardID span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) create(ctx context.Context, ts time.Time) (blockDelegate, error) block(ctx context.Context, id GlobalItemID) (blockDelegate, error) } var ( _ SeriesDatabase = (*seriesDB)(nil) _ blockDatabase = (*seriesDB)(nil) dummySeriesList = make(SeriesList, 0) ) type seriesDB struct { seriesMetadata kv.Store invertedIndex index.Store lsmIndex index.Store l *logger.Logger segCtrl *segmentController position common.Position sync.Mutex sID common.ShardID } func (s *seriesDB) writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error { if s.invertedIndex == nil { return errors.New("inverted index is not enabled") } return s.invertedIndex.Write(fields, uint64(seriesID)) } func (s *seriesDB) writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error { if s.lsmIndex == nil { return errors.New("lsm index is not enabled") } return s.lsmIndex.Write(fields, uint64(seriesID)) } // nolint: contextcheck func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { var series string if e := s.l.Debug(); e.Enabled() { var buf bytes.Buffer buf.Write(seriesPrefix) buf.Write(id.Marshal()) data, err := s.seriesMetadata.Get(buf.Bytes()) if err != nil { e.Err(err).Msg("failed to get series id's literal") return newSeries(s.context(), id, "unknown", s), nil } entityValues, err := UnmarshalEntityValues(data) if err != nil { e.Err(err).Msg("malformed series id's literal") return newSeries(s.context(), id, "malformed", s), nil } series = entityValues.String() } return newSeries(s.context(), id, series, s), nil } func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (blockDelegate, error) { seg := s.segCtrl.get(id.segID) if seg == nil { return nil, nil } return seg.blockController.get(ctx, id.blockID) } func (s *seriesDB) shardID() common.ShardID { return s.sID } func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) { entityKey := prepend(key, entityPrefix) data, err := s.seriesMetadata.Get(entityKey) if errors.Is(err, kv.ErrKeyNotFound) { s.Lock() defer s.Unlock() seriesID := bytesToSeriesID(Hash(key)) encodedData, entityValuesBytes, errDecode := encode(seriesID, entityValues) if errDecode != nil { return nil, errDecode } errDecode = s.seriesMetadata.Put(entityKey, encodedData) if errDecode != nil { receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "true")...) return nil, errDecode } receivedBytesCounter.Inc(float64(len(entityKey)+len(encodedData)), append(s.position.ShardLabelValues(), "series")...) receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "false")...) var series string if e := s.l.Debug(); e.Enabled() { errDecode = s.seriesMetadata.Put(prepend(seriesID.Marshal(), seriesPrefix), entityValuesBytes) if errDecode != nil { return nil, errDecode } series = entityValues.String() e.Str("series", series). Uint64("series_id", uint64(seriesID)). Msg("create a new series") } return newSeries(s.context(), seriesID, series, s), nil } if err != nil { return nil, err } seriesID, entityValues, err := decode(data) if err != nil { return nil, err } return newSeries(s.context(), seriesID, entityValues.String(), s), nil } func (s *seriesDB) SizeOnDisk() int64 { return s.seriesMetadata.SizeOnDisk() } func encode(seriesID common.SeriesID, evv EntityValues) ([]byte, []byte, error) { data, err := MarshalEntityValues(evv) if err != nil { return nil, nil, err } var buf bytes.Buffer buf.Write(convert.Uint64ToBytes(uint64(seriesID))) buf.Write(data) return buf.Bytes(), data, nil } func decode(value []byte) (common.SeriesID, EntityValues, error) { seriesID := convert.BytesToUint64(value[:8]) entityValues, err := UnmarshalEntityValues(value[8:]) if err != nil { return 0, nil, err } return common.SeriesID(seriesID), entityValues, nil } func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) { prefix := prepend(path.prefix, entityPrefix) l := logger.FetchOrDefault(ctx, "series_database", s.l) if path.isFull { data, err := s.seriesMetadata.Get(prefix) if err != nil && errors.Is(err, kv.ErrKeyNotFound) { if e := l.Debug(); e.Enabled() { e.Hex("path", path.prefix).Msg("doesn't get any series") } return dummySeriesList, nil } if err != nil { return nil, err } seriesID, entityValue, err := decode(data) if err != nil { return nil, err } var series string if e := l.Debug(); e.Enabled() { series = entityValue.String() e.Int("prefix_len", path.offset/8). Str("series", series). Uint64("series_id", uint64(seriesID)). Msg("got a series with a full path") } // nolint: contextcheck return []Series{newSeries(s.context(), seriesID, series, s)}, nil } result := make([]Series, 0) var err error errScan := s.seriesMetadata.Scan(prefix, prepend(path.seekKey, entityPrefix), kv.DefaultScanOpts, func(key []byte, getVal func() ([]byte, error)) error { key = key[entityPrefixLen:] comparableKey := make([]byte, len(key)) // avoid slice out of bound if len(key) > len(path.mask) { return nil } for i, b := range key { comparableKey[i] = path.mask[i] & b } if bytes.Equal(path.template, comparableKey) { data, errGetVal := getVal() if errGetVal != nil { err = multierr.Append(err, errGetVal) return nil } seriesID, entityValue, errDecode := decode(data) if errDecode != nil { err = multierr.Append(err, errDecode) return nil } series := entityValue.String() if e := l.Debug(); e.Enabled() { e.Int("prefix_len", path.offset/8). Str("series", series). Uint64("series_id", uint64(seriesID)). Msg("match a series") } result = append(result, newSeries(s.context(), seriesID, series, s)) } return nil }) if errScan != nil { return nil, errScan } return result, err } func (s *seriesDB) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) { if s.invertedIndex == nil || s.lsmIndex == nil { return nil, errors.New("search is not supported") } if path.isFull { return s.List(ctx, path) } if order == nil { return s.filterSeries(ctx, path, filter) } fieldKey := index.FieldKey{ IndexRuleID: order.Index.GetMetadata().Id, } var iter index.FieldIterator var err error switch order.Index.Type { case databasev1.IndexRule_TYPE_TREE: iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, order.Sort) case databasev1.IndexRule_TYPE_INVERTED: iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, order.Sort) default: return nil, errUnspecifiedIndexType } if err != nil { return nil, err } defer func() { err = multierr.Append(err, iter.Close()) }() var pl posting.List if pl, err = s.seriesFilter(ctx, path, filter); err != nil { return nil, err } seriesList := make([]Series, 0) for iter.Next() { pv := iter.Val().Value if err = pv.Intersect(pl); err != nil { return nil, err } if pv.IsEmpty() { continue } pIter := pv.Iterator() for pIter.Next() { var series Series if series, err = s.GetByID(common.SeriesID(pIter.Current())); err != nil { return nil, multierr.Append(err, pIter.Close()) } seriesList = append(seriesList, series) } if err = pIter.Close(); err != nil { return nil, err } } return seriesList, err } func (s *seriesDB) filterSeries(ctx context.Context, path Path, filter index.Filter) (SeriesList, error) { var seriesList SeriesList var err error if filter == nil { return s.List(ctx, path) } var pl posting.List if pl, err = filter.Execute(func(ruleType databasev1.IndexRule_Type) (index.Searcher, error) { switch ruleType { case databasev1.IndexRule_TYPE_TREE: return s.lsmIndex, nil case databasev1.IndexRule_TYPE_INVERTED: return s.invertedIndex, nil default: return nil, errUnspecifiedIndexType } }, 0); err != nil { return nil, err } if len(path.seekKey) == 0 { iter := pl.Iterator() defer func() { err = multierr.Append(err, iter.Close()) }() for iter.Next() { var series Series if series, err = s.GetByID(common.SeriesID(iter.Current())); err != nil { return nil, err } seriesList = append(seriesList, series) } return seriesList, err } if seriesList, err = s.List(ctx, path); err != nil { return nil, err } // Remove a series from seriesList if its ID is not in the pl for i := 0; i < len(seriesList); i++ { if !pl.Contains(uint64(seriesList[i].ID())) { seriesList = append(seriesList[:i], seriesList[i+1:]...) i-- } } return seriesList, nil } func (s *seriesDB) seriesFilter(ctx context.Context, path Path, filter index.Filter) (posting.List, error) { var sl SeriesList var err error if sl, err = s.filterSeries(ctx, path, filter); err != nil { return nil, err } pl := roaring.NewPostingList() for _, series := range sl { pl.Insert(uint64(series.ID())) } return pl, nil } func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) { result := make([]blockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { dd, err := s.blockController.span(ctx, timeRange) if err != nil { return nil, err } if dd == nil { continue } result = append(result, dd...) } return result, nil } func (s *seriesDB) create(ctx context.Context, ts time.Time) (blockDelegate, error) { s.Lock() defer s.Unlock() timeRange := timestamp.NewInclusiveTimeRange(ts, ts) ss := s.segCtrl.span(timeRange) if len(ss) > 0 { s := ss[0] dd, err := s.blockController.span(ctx, timeRange) if err != nil { return nil, err } if len(dd) > 0 { return dd[0], nil } block, err := s.blockController.create(timeRange.Start) if err != nil { return nil, err } return block.delegate(ctx) } seg, err := s.segCtrl.create(timeRange.Start) if err != nil { return nil, err } block, err := seg.blockController.create(timeRange.Start) if err != nil { return nil, err } return block.delegate(ctx) } func (s *seriesDB) context() context.Context { return context.WithValue(context.Background(), logger.ContextKey, s.l) } func (s *seriesDB) Close() error { err := s.seriesMetadata.Close() if s.invertedIndex != nil { err = multierr.Append(err, s.invertedIndex.Close()) } if s.lsmIndex != nil { err = multierr.Append(err, s.lsmIndex.Close()) } return err } func newSeriesDataBase(ctx context.Context, shardID common.ShardID, root string, segCtrl *segmentController) (SeriesDatabase, error) { sdb := &seriesDB{ sID: shardID, segCtrl: segCtrl, l: logger.Fetch(ctx, "series_database"), position: common.GetPosition(ctx), } o := ctx.Value(OptionsKey) var options DatabaseOpts if o == nil { options = DatabaseOpts{} } else { options = o.(DatabaseOpts) } var memSize int64 if options.SeriesMemSize > 1 { memSize = int64(options.SeriesMemSize) } else { memSize = defaultKVMemorySize } var err error if sdb.seriesMetadata, err = kv.OpenStore(root+"/md", kv.StoreWithNamedLogger("metadata", sdb.l), kv.StoreWithMemTableSize(memSize), ); err != nil { return nil, err } if options.IndexGranularity == IndexGranularitySeries { if sdb.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{ Path: path.Join(root, componentSecondInvertedIdx), Logger: sdb.l.Named(componentSecondInvertedIdx), BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, }); err != nil { return nil, err } if sdb.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{ Path: path.Join(root, componentSecondLSMIdx), Logger: sdb.l.Named(componentSecondLSMIdx), MemTableSize: defaultKVMemorySize, }); err != nil { return nil, err } } return sdb, nil } // HashEntity runs hash function (e.g. with xxhash algorithm) on each segment of the Entity, // and concatenates all uint64 in byte array. So the return length of the byte array will be // 8 (every uint64 has 8 bytes) * length of the input. func HashEntity(entity Entity) []byte { result := make([]byte, 0, len(entity)*8) for _, entry := range entity { result = append(result, Hash(entry)...) } return result } // SeriesID transforms Entity to common.SeriesID. func SeriesID(entity Entity) common.SeriesID { return common.SeriesID(convert.Hash(HashEntity(entity))) } // Hash encode Entry to 8 bytes. func Hash(entry []byte) []byte { return convert.Uint64ToBytes(convert.Hash(entry)) } func bytesToSeriesID(data []byte) common.SeriesID { return common.SeriesID(convert.BytesToUint64(data)) } // SeriesList is a collection of Series. type SeriesList []Series func (a SeriesList) Len() int { return len(a) } func (a SeriesList) Less(i, j int) bool { return a[i].ID() < a[j].ID() } func (a SeriesList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // Merge other SeriesList with this one to create a new SeriesList. func (a SeriesList) Merge(other SeriesList) SeriesList { if len(other) == 0 { return a } sort.Sort(other) if len(a) == 0 { return other } final := SeriesList{} i := 0 j := 0 for i < len(a) && j < len(other) { if a[i].ID() < other[j].ID() { final = append(final, a[i]) i++ } else { // deduplication if a[i].ID() == other[j].ID() { i++ } final = append(final, other[j]) j++ } } for ; i < len(a); i++ { final = append(final, a[i]) } for ; j < len(other); j++ { final = append(final, other[j]) } return final }