pkg/index/inverted/inverted.go (423 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package inverted implements a inverted index repository.
package inverted
import (
"bytes"
"context"
"errors"
"io"
"log"
"math"
"time"
"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/analysis"
"github.com/blugelabs/bluge/analysis/analyzer"
blugeIndex "github.com/blugelabs/bluge/index"
"github.com/blugelabs/bluge/search"
"github.com/dgraph-io/badger/v3/y"
"go.uber.org/multierr"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
const (
docIDField = "_id"
batchSize = 1024
seriesIDField = "series_id"
idField = "id"
)
var (
defaultUpper = convert.Uint64ToBytes(math.MaxUint64)
defaultLower = convert.Uint64ToBytes(0)
)
var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
func init() {
analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
databasev1.IndexRule_ANALYZER_KEYWORD: analyzer.NewKeywordAnalyzer(),
databasev1.IndexRule_ANALYZER_SIMPLE: analyzer.NewSimpleAnalyzer(),
databasev1.IndexRule_ANALYZER_STANDARD: analyzer.NewStandardAnalyzer(),
}
}
var _ index.Store = (*store)(nil)
// StoreOpts wraps options to create a inverted index repository.
type StoreOpts struct {
Logger *logger.Logger
Path string
BatchWaitSec int64
}
type doc struct {
fields []index.Field
docID uint64
}
type flushEvent struct {
onComplete chan struct{}
}
type store struct {
writer *bluge.Writer
ch chan any
closer *run.Closer
l *logger.Logger
batchInterval time.Duration
}
// NewStore create a new inverted index repository.
func NewStore(opts StoreOpts) (index.Store, error) {
indexConfig := blugeIndex.DefaultConfig(opts.Path).WithUnsafeBatches().
WithPersisterNapTimeMSec(60 * 1000)
indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1
indexConfig.MergePlanOptions.MaxSegmentSize = 500000
indexConfig.MergePlanOptions.SegmentsPerMergeTask = 20
config := bluge.DefaultConfigWithIndexConfig(indexConfig)
config.DefaultSearchAnalyzer = analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
w, err := bluge.OpenWriter(config)
if err != nil {
return nil, err
}
sec := opts.BatchWaitSec
if sec < 1 {
sec = 1
}
s := &store{
writer: w,
batchInterval: time.Duration(sec * int64(time.Second)),
l: opts.Logger,
ch: make(chan any, batchSize),
closer: run.NewCloser(1),
}
s.run()
return s, nil
}
func (s *store) Close() error {
s.closer.CloseThenWait()
return s.writer.Close()
}
func (s *store) Write(fields []index.Field, docID uint64) error {
if len(fields) < 1 {
return nil
}
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
select {
case <-s.closer.CloseNotify():
case s.ch <- doc{
fields: fields,
docID: docID,
}:
}
return nil
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) {
if termRange.Lower != nil &&
termRange.Upper != nil &&
bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
return index.DummyFieldIterator, nil
}
if termRange.Upper == nil {
termRange.Upper = defaultUpper
}
if termRange.Lower == nil {
termRange.Lower = defaultLower
}
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
fk := fieldKey.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
if fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
query = bluge.NewTermRangeInclusiveQuery(
index.FieldStr(fieldKey, termRange.Lower),
index.FieldStr(fieldKey, termRange.Upper),
termRange.IncludesLower,
termRange.IncludesUpper,
).
SetField(fk)
} else {
shouldDecodeTerm = false
bQuery := bluge.NewBooleanQuery().
AddMust(bluge.NewTermRangeInclusiveQuery(
string(termRange.Lower),
string(termRange.Upper),
termRange.IncludesLower,
termRange.IncludesUpper,
).
SetField(fk))
if fieldKey.HasSeriesID() {
bQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
}
query = bQuery
}
sortedKey := fk
if order == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
}
documentMatchIterator, err := reader.Search(context.Background(), bluge.NewTopNSearch(math.MaxInt64, query).SortBy([]string{sortedKey}))
if err != nil {
return nil, err
}
result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
return &result, nil
}
func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) {
return s.Range(fieldKey, index.RangeOpts{})
}
func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
fk := field.Key.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
if field.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
query = bluge.NewTermQuery(string(field.Marshal())).SetField(fk)
} else {
shouldDecodeTerm = false
bQuery := bluge.NewBooleanQuery().
AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk))
if field.Key.HasSeriesID() {
bQuery.AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())).SetField(seriesIDField))
}
query = bQuery
}
documentMatchIterator, err := reader.Search(context.Background(), bluge.NewAllMatches(query))
if err != nil {
return nil, err
}
iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list = roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
}
return list, err
}
func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, error) {
if len(matches) == 0 || fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
return roaring.DummyPostingList, nil
}
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
analyzer := analyzers[fieldKey.Analyzer]
fk := fieldKey.MarshalIndexRule()
query := bluge.NewBooleanQuery()
if fieldKey.HasSeriesID() {
query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
}
for _, m := range matches {
query.AddMust(bluge.NewMatchQuery(m).SetField(fk).
SetAnalyzer(analyzer))
}
documentMatchIterator, err := reader.Search(context.Background(), bluge.NewAllMatches(query))
if err != nil {
return nil, err
}
iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list := roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
}
return list, err
}
func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC)
if err != nil {
return roaring.DummyPostingList, err
}
list = roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
}
err = multierr.Append(err, iter.Close())
return
}
func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
}
func (s *store) run() {
go func() {
defer s.closer.Done()
size := 0
batch := bluge.NewBatch()
flush := func() {
if size < 1 {
return
}
if err := s.writer.Batch(batch); err != nil {
s.l.Error().Err(err).Msg("write to the inverted index")
}
batch.Reset()
size = 0
}
var docIDBuffer bytes.Buffer
for {
timer := time.NewTimer(s.batchInterval)
select {
case <-s.closer.CloseNotify():
timer.Stop()
return
case event, more := <-s.ch:
if !more {
timer.Stop()
return
}
switch d := event.(type) {
case flushEvent:
flush()
close(d.onComplete)
case doc:
// TODO: generate a segment directly.
fk := d.fields[0].Key
docIDBuffer.Reset()
if fk.HasSeriesID() {
docIDBuffer.Write(fk.SeriesID.Marshal())
}
docIDBuffer.Write(convert.Uint64ToBytes(d.docID))
doc := bluge.NewDocument(docIDBuffer.String())
toAddSeriesIDField := false
for _, f := range d.fields {
if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable())
} else {
toAddSeriesIDField = true
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable().
WithAnalyzer(analyzers[f.Key.Analyzer]))
}
}
if toAddSeriesIDField && fk.HasSeriesID() {
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
}
size++
batch.Update(doc.ID(), doc)
if size >= batchSize {
flush()
}
}
case <-timer.C:
flush()
}
timer.Stop()
}
}()
}
func (s *store) flush() {
if !s.closer.AddRunning() {
return
}
defer s.closer.Done()
onComplete := make(chan struct{})
select {
case <-s.closer.CloseNotify():
case s.ch <- flushEvent{onComplete: onComplete}:
}
<-onComplete
}
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
closer io.Closer
current *index.PostingValue
agg *index.PostingValue
fieldKey string
shouldDecodeTerm bool
closed bool
}
func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator {
return blugeMatchIterator{
delegated: delegated,
fieldKey: fieldKey,
shouldDecodeTerm: shouldDecodeTerm,
closer: closer,
}
}
func (bmi *blugeMatchIterator) Next() bool {
if bmi.err != nil || bmi.closed {
return false
}
//revive:disable:empty-block
for bmi.nextTerm() {
}
//revive:enable:empty-block
if bmi.err != nil || bmi.closed {
return false
}
return true
}
func (bmi *blugeMatchIterator) nextTerm() bool {
var match *search.DocumentMatch
match, bmi.err = bmi.delegated.Next()
if bmi.err != nil {
return false
}
if match == nil {
if bmi.agg == nil {
bmi.closed = true
} else {
bmi.current = bmi.agg
bmi.agg = nil
}
return false
}
i := 0
var docID uint64
var term []byte
bmi.err = match.VisitStoredFields(func(field string, value []byte) bool {
if field == docIDField {
if len(value) == 8 {
docID = convert.BytesToUint64(value)
} else if len(value) == 16 {
// value = seriesID(8bytes)+docID(8bytes)
docID = convert.BytesToUint64(value[8:])
}
i++
}
if field == bmi.fieldKey {
v := y.Copy(value)
if bmi.shouldDecodeTerm {
term = index.UnmarshalTerm(v)
} else {
term = v
}
i++
}
return i < 2
})
if i != 2 {
// ignore invalid data
// TODO: add metric to cumulate ignored docs
return true
}
if bmi.err != nil {
return false
}
if bmi.agg == nil {
bmi.agg = &index.PostingValue{
Term: term,
Value: roaring.NewPostingListWithInitialData(docID),
}
return true
}
if bytes.Equal(bmi.agg.Term, term) {
bmi.agg.Value.Insert(docID)
return true
}
bmi.current = bmi.agg
bmi.agg = &index.PostingValue{
Term: term,
Value: roaring.NewPostingListWithInitialData(docID),
}
return false
}
func (bmi *blugeMatchIterator) Val() *index.PostingValue {
return bmi.current
}
func (bmi *blugeMatchIterator) Close() error {
bmi.closed = true
return errors.Join(bmi.err, bmi.closer.Close())
}