pkg/index/lsm/search.go (73 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package lsm import ( "bytes" "github.com/pkg/errors" "go.uber.org/multierr" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" ) var errUnsupportedOperation = errors.New("unsupported operation") func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) { return s.Range(fieldKey, index.RangeOpts{}) } func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { list = roaring.NewPostingList() err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error { list.Insert(convert.BytesToUint64(itemID)) return nil }) if errors.Is(err, kv.ErrKeyNotFound) { return roaring.DummyPostingList, nil } return } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC) if err != nil { return roaring.DummyPostingList, err } list = roaring.NewPostingList() for iter.Next() { err = multierr.Append(err, list.Union(iter.Val().Value)) } err = multierr.Append(err, iter.Close()) return } func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) { if !s.closer.AddRunning() { return nil, errors.New("lsm index store is closed") } return newFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.closer, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) { pv := &index.PostingValue{ Term: term, Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(value)), } for ; delegated.Valid(); delegated.Next() { f := index.Field{} err := f.Unmarshal(delegated.Key()) if err != nil { return nil, err } if !bytes.Equal(f.Term, term) { break } itemID := convert.BytesToUint64(delegated.Val()) if e := s.l.Debug(); e.Enabled() { e.Uint64("series_id", uint64(fieldKey.SeriesID)). Uint64("index_rule_id", uint64(fieldKey.IndexRuleID)). Uint64("item_id", itemID). Msg("fetched item from the index") } pv.Value.Insert(itemID) } return pv, nil }), nil } func (s *store) Match(_ index.FieldKey, _ []string) (posting.List, error) { return nil, errors.WithMessage(errUnsupportedOperation, "LSM-Tree index doesn't support full-text searching") }