pkg/index/inverted/inverted.go (446 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package inverted implements an inverted index repository. package inverted import ( "context" "io" "log" "strconv" "time" "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/analysis" "github.com/blugelabs/bluge/analysis/analyzer" blugeIndex "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/numeric" "github.com/blugelabs/bluge/search" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" ) const ( docIDField = "_id" batchSize = 1024 seriesIDField = "_series_id" timestampField = "_timestamp" versionField = "_version" sourceField = "_source" ) var ( defaultRangePreloadSize = 1000 defaultProjection = []string{docIDField, timestampField} ) // Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer. var Analyzers map[string]*analysis.Analyzer func init() { Analyzers = map[string]*analysis.Analyzer{ index.AnalyzerKeyword: analyzer.NewKeywordAnalyzer(), index.AnalyzerSimple: analyzer.NewSimpleAnalyzer(), index.AnalyzerStandard: analyzer.NewStandardAnalyzer(), index.AnalyzerURL: newURLAnalyzer(), } } var _ index.Store = (*store)(nil) // StoreOpts wraps options to create an inverted index repository. type StoreOpts struct { Logger *logger.Logger Metrics *Metrics Path string BatchWaitSec int64 CacheMaxBytes int } type store struct { writer *bluge.Writer closer *run.Closer l *logger.Logger metrics *Metrics } var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch") func generateBatch() *blugeIndex.Batch { b := batchPool.Get() if b == nil { return bluge.NewBatch() } return b } func releaseBatch(b *blugeIndex.Batch) { b.Reset() batchPool.Put(b) } func (s *store) Batch(batch index.Batch) error { if !s.closer.AddRunning() { return nil } defer s.closer.Done() b := generateBatch() defer releaseBatch(b) for _, d := range batch.Documents { doc := bluge.NewDocument(convert.BytesToString(convert.Uint64ToBytes(d.DocID))) for i, f := range d.Fields { var tf *bluge.TermField switch f.GetTerm().(type) { case *index.BytesTermValue: tf = bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.GetBytes()) case *index.FloatTermValue: tf = bluge.NewNumericField(f.Key.Marshal(), f.GetFloat()) default: return errors.Errorf("unexpected field type: %T", f.GetTerm()) } if !f.NoSort { tf.Sortable() } if f.Store { tf.StoreValue() } if f.Key.Analyzer != index.AnalyzerUnspecified { tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) } doc.AddField(tf) if i == 0 { doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, f.Key.SeriesID.Marshal()).StoreValue()) } } if d.Timestamp > 0 { doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue()) } b.Insert(doc) } return s.writer.Batch(b) } // NewStore create a new inverted index repository. func NewStore(opts StoreOpts) (index.SeriesStore, error) { if opts.Logger == nil { opts.Logger = logger.GetLogger("inverted") } indexConfig := blugeIndex.DefaultConfig(opts.Path) if opts.BatchWaitSec > 0 { indexConfig = indexConfig.WithUnsafeBatches(). WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000)) } indexConfig.CacheMaxBytes = opts.CacheMaxBytes config := bluge.DefaultConfigWithIndexConfig(indexConfig) config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword] config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0) w, err := bluge.OpenWriter(config) if err != nil { return nil, err } s := &store{ writer: w, l: opts.Logger, closer: run.NewCloser(1), metrics: opts.Metrics, } return s, nil } func (s *store) Close() error { s.closer.Done() s.closer.CloseThenWait() return s.writer.Close() } func (s *store) Reset() { s.writer.ResetCache() } func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int, ) (iter index.FieldIterator[*index.DocumentResult], err error) { if !termRange.IsEmpty() && !termRange.Valid() { return index.DummyFieldIterator, nil } if !s.closer.AddRunning() { return nil, nil } reader, err := s.writer.Reader() if err != nil { return nil, err } fk := fieldKey.Marshal() rangeQuery := bluge.NewBooleanQuery() rangeNode := newMustNode() rangeQuery = rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())). SetField(seriesIDField)) rangeNode.Append(newTermNode(string(fieldKey.SeriesID.Marshal()), nil)) if !termRange.IsEmpty() && termRange.Valid() { switch lower := termRange.Lower.(type) { case *index.BytesTermValue: upper := termRange.Upper.(*index.BytesTermValue) rangeQuery.AddMust(bluge.NewTermRangeInclusiveQuery( string(lower.Value), string(upper.Value), termRange.IncludesLower, termRange.IncludesUpper, ). SetField(fk)) rangeNode.Append(newTermRangeInclusiveNode( string(lower.Value), string(upper.Value), termRange.IncludesLower, termRange.IncludesUpper, nil, false, )) case *index.FloatTermValue: upper := termRange.Upper.(*index.FloatTermValue) rangeQuery.AddMust(bluge.NewNumericRangeInclusiveQuery( lower.Value, upper.Value, termRange.IncludesLower, termRange.IncludesUpper, ). SetField(fk)) rangeNode.Append(newTermRangeInclusiveNode( strconv.FormatFloat(lower.Value, 'f', -1, 64), strconv.FormatFloat(upper.Value, 'f', -1, 64), termRange.IncludesLower, termRange.IncludesUpper, nil, false, )) default: logger.Panicf("unexpected field type: %T", lower) } } if n := appendTimeRangeToQuery(rangeQuery, fieldKey); n != nil { rangeNode.Append(n) } sortedKey := fk if order == modelv1.Sort_SORT_DESC { sortedKey = "-" + sortedKey } result := &sortIterator{ query: &queryNode{rangeQuery, rangeNode}, reader: reader, sortedKey: sortedKey, size: preLoadSize, closer: s.closer, ctx: ctx, newIterator: newBlugeMatchIterator, } return result, nil } func appendTimeRangeToQuery(query *bluge.BooleanQuery, fieldKey index.FieldKey) node { if fieldKey.TimeRange == nil || !fieldKey.TimeRange.Valid() { return nil } lower := numeric.Float64ToInt64(fieldKey.TimeRange.Lower.(*index.FloatTermValue).Value) upper := numeric.Float64ToInt64(fieldKey.TimeRange.Upper.(*index.FloatTermValue).Value) query.AddMust(bluge.NewDateRangeInclusiveQuery( time.Unix(0, lower), time.Unix(0, upper), fieldKey.TimeRange.IncludesLower, fieldKey.TimeRange.IncludesUpper, ).SetField(timestampField)) return newTermRangeInclusiveNode( strconv.FormatInt(lower, 10), strconv.FormatInt(upper, 10), fieldKey.TimeRange.IncludesLower, fieldKey.TimeRange.IncludesUpper, nil, true, ) } func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, posting.List, error) { return s.Range(fieldKey, index.RangeOpts{}) } func (s *store) MatchTerms(field index.Field) (list posting.List, timestamps posting.List, err error) { reader, err := s.writer.Reader() if err != nil { return nil, nil, err } var query *bluge.BooleanQuery switch field.GetTerm().(type) { case *index.BytesTermValue: query = bluge.NewBooleanQuery() query.AddMust(bluge.NewTermQuery(string(field.GetBytes())).SetField(field.Key.Marshal())) case *index.FloatTermValue: query = bluge.NewBooleanQuery() query.AddMust(bluge.NewTermQuery( strconv.FormatFloat(field.GetFloat(), 'f', -1, 64)). SetField(field.Key.Marshal())) case nil: return roaring.DummyPostingList, roaring.DummyPostingList, nil default: return nil, nil, errors.Errorf("unexpected field type: %T", field.GetTerm()) } query.AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())). SetField(seriesIDField)) _ = appendTimeRangeToQuery(query, field.Key) documentMatchIterator, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) if err != nil { return nil, nil, err } iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() list, timestamps = roaring.NewPostingList(), roaring.NewPostingList() for iter.Next() { list.Insert(iter.Val().DocID) timestamps.Insert(uint64(iter.Val().Timestamp)) } return list, timestamps, err } func (s *store) Match(fieldKey index.FieldKey, matches []string, opts *modelv1.Condition_MatchOption) (posting.List, posting.List, error) { if len(matches) == 0 || fieldKey.Analyzer == index.AnalyzerUnspecified { return roaring.DummyPostingList, roaring.DummyPostingList, nil } reader, err := s.writer.Reader() if err != nil { return nil, nil, err } analyzer, operator := getMatchOptions(fieldKey.Analyzer, opts) fk := fieldKey.Marshal() query := bluge.NewBooleanQuery() query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField)) for _, m := range matches { query.AddMust(bluge.NewMatchQuery(m).SetField(fk). SetAnalyzer(analyzer).SetOperator(operator)) } _ = appendTimeRangeToQuery(query, fieldKey) documentMatchIterator, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) if err != nil { return nil, nil, err } iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() list, timestamps := roaring.NewPostingList(), roaring.NewPostingList() for iter.Next() { list.Insert(iter.Val().DocID) timestamps.Insert(uint64(iter.Val().Timestamp)) } return list, timestamps, err } func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOption) (*analysis.Analyzer, bluge.MatchQueryOperator) { analyzer := Analyzers[analyzerOnIndexRule] operator := bluge.MatchQueryOperatorOr if opts != nil { if opts.Analyzer != index.AnalyzerUnspecified { analyzer = Analyzers[opts.Analyzer] } if opts.Operator != modelv1.Condition_MatchOption_OPERATOR_UNSPECIFIED { if opts.Operator == modelv1.Condition_MatchOption_OPERATOR_AND { operator = bluge.MatchQueryOperatorAnd } } } return analyzer, bluge.MatchQueryOperator(operator) } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, timestamps posting.List, err error) { iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize) if err != nil { return roaring.DummyPostingList, roaring.DummyPostingList, err } list, timestamps = roaring.NewPostingList(), roaring.NewPostingList() for iter.Next() { list.Insert(iter.Val().DocID) timestamps.Insert(uint64(iter.Val().Timestamp)) } err = multierr.Append(err, iter.Close()) return } func (s *store) TakeFileSnapshot(dst string) error { reader, err := s.writer.Reader() if err != nil { return err } defer reader.Close() return reader.Backup(dst, nil) } type blugeMatchIterator struct { delegated search.DocumentMatchIterator err error closer io.Closer ctx *search.Context loadDocValues []string current index.DocumentResult hit int } func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, loadDocValues []string, ) blugeIterator { bmi := &blugeMatchIterator{ delegated: delegated, closer: closer, current: index.DocumentResult{}, ctx: search.NewSearchContext(1, 0), loadDocValues: loadDocValues, } return bmi } func (bmi *blugeMatchIterator) Next() bool { var match *search.DocumentMatch match, bmi.err = bmi.delegated.Next() if bmi.err != nil { bmi.err = errors.WithMessagef(bmi.err, "failed to get next document, hit: %d", bmi.hit) return false } if match == nil { bmi.err = io.EOF return false } bmi.hit = match.HitNumber for i := range bmi.current.Values { bmi.current.Values[i] = nil } bmi.current.DocID = 0 bmi.current.SeriesID = 0 bmi.current.Timestamp = 0 bmi.current.SortedValue = nil if len(match.SortValue) > 0 { bmi.current.SortedValue = match.SortValue[0] } if len(bmi.loadDocValues) == 0 { err := match.VisitStoredFields(bmi.setVal) bmi.err = multierr.Combine(bmi.err, err) return bmi.err == nil } if err := match.LoadDocumentValues(bmi.ctx, bmi.loadDocValues); err != nil { bmi.err = multierr.Combine(bmi.err, err) return false } for _, dv := range bmi.loadDocValues { vv := match.DocValues(dv) if len(vv) == 0 { continue } bmi.setVal(dv, vv[0]) } return true } func (bmi *blugeMatchIterator) setVal(field string, value []byte) bool { switch field { case docIDField: bmi.current.DocID = convert.BytesToUint64(value) case seriesIDField: bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value)) case timestampField: ts, errTime := bluge.DecodeDateTime(value) if errTime != nil { bmi.err = errTime return false } bmi.current.Timestamp = ts.UnixNano() default: bmi.err = errors.Errorf("unexpected field: %s", field) } return true } func (bmi *blugeMatchIterator) Val() index.DocumentResult { return bmi.current } func (bmi *blugeMatchIterator) Close() error { if bmi.closer == nil { if errors.Is(bmi.err, io.EOF) { return nil } return bmi.err } err := bmi.closer.Close() if errors.Is(bmi.err, io.EOF) { return err } return multierr.Combine(bmi.err, bmi.closer.Close()) }