pkg/index/inverted/inverted_series.go (399 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package inverted implements a inverted index repository.
package inverted
import (
"bytes"
"context"
"io"
"time"
"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/search"
segment "github.com/blugelabs/bluge_segment_api"
"github.com/pkg/errors"
"go.uber.org/multierr"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var emptySeries = make([]index.SeriesDocument, 0)
func (s *store) InsertSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc, ff := toDoc(d, true)
b.InsertIfAbsent(doc.ID(), ff, doc)
}
return s.writer.Batch(b)
}
func (s *store) UpdateSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc, _ := toDoc(d, false)
b.Update(doc.ID(), doc)
}
return s.writer.Batch(b)
}
func (s *store) Delete(docID [][]byte) error {
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
batch := generateBatch()
defer releaseBatch(batch)
for _, id := range docID {
batch.Delete(bluge.Identifier(id))
}
return s.writer.Batch(batch)
}
func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) {
doc := bluge.NewDocument(convert.BytesToString(d.EntityValues))
var fieldNames []string
if toParseFieldNames && len(d.Fields) > 0 {
fieldNames = make([]string, 0, len(d.Fields))
}
for _, f := range d.Fields {
var tf *bluge.TermField
k := f.Key.Marshal()
if f.Index {
tf = bluge.NewKeywordFieldBytes(k, f.GetBytes())
if f.Store {
tf.StoreValue()
}
if !f.NoSort {
tf.Sortable()
}
if f.Key.Analyzer != index.AnalyzerUnspecified {
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
} else {
tf = bluge.NewStoredOnlyField(k, f.GetBytes())
}
doc.AddField(tf)
if fieldNames != nil {
fieldNames = append(fieldNames, k)
}
}
if d.Timestamp > 0 {
doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue())
}
if d.Version > 0 {
vf := bluge.NewStoredOnlyField(versionField, convert.Int64ToBytes(d.Version))
doc.AddField(vf)
}
return doc, fieldNames
}
// BuildQuery implements index.SeriesStore.
func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery index.Query, timeRange *timestamp.TimeRange) (index.Query, error) {
if len(seriesMatchers) == 0 {
return secondaryQuery, nil
}
qs := make([]bluge.Query, len(seriesMatchers))
nodes := make([]node, len(seriesMatchers))
for i := range seriesMatchers {
switch seriesMatchers[i].Type {
case index.SeriesMatcherTypeExact:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewTermQuery(match)
q.SetField(docIDField)
qs[i] = q
nodes = append(nodes, newTermNode(match, nil))
case index.SeriesMatcherTypePrefix:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewPrefixQuery(match)
q.SetField(docIDField)
qs[i] = q
nodes = append(nodes, newPrefixNode(match))
case index.SeriesMatcherTypeWildcard:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewWildcardQuery(match)
q.SetField(docIDField)
qs[i] = q
nodes = append(nodes, newWildcardNode(match))
default:
return nil, errors.Errorf("unsupported series matcher type: %v", seriesMatchers[i].Type)
}
}
var primaryQuery bluge.Query
var n node
if len(qs) > 1 {
bq := bluge.NewBooleanQuery()
bq.AddShould(qs...)
bq.SetMinShould(1)
primaryQuery = bq
n = newShouldNode()
for i := range nodes {
n.(*shouldNode).Append(nodes[i])
}
} else {
primaryQuery = qs[0]
n = nodes[0]
}
query := bluge.NewBooleanQuery().AddMust(primaryQuery)
node := newMustNode()
node.Append(n)
if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil {
query.AddMust(secondaryQuery.(*queryNode).query)
node.Append(secondaryQuery.(*queryNode).node)
}
if timeRange != nil {
q := bluge.NewDateRangeInclusiveQuery(timeRange.Start, timeRange.End, timeRange.IncludeStart, timeRange.IncludeEnd)
q.SetField(timestampField)
query.AddMust(q)
node.Append(newTimeRangeNode(timeRange))
}
return &queryNode{query, node}, nil
}
// Search implements index.SeriesStore.
func (s *store) Search(ctx context.Context,
projection []index.FieldKey, query index.Query, limit int,
) ([]index.SeriesDocument, error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
defer func() {
if err := recover(); err != nil {
_ = reader.Close()
panic(err)
}
_ = reader.Close()
}()
dmi, err := reader.Search(ctx, bluge.NewAllMatches(query.(*queryNode).query))
if err != nil {
return nil, err
}
return parseResult(dmi, projection, limit)
}
func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey, limit int) ([]index.SeriesDocument, error) {
result := make([]index.SeriesDocument, 0, 10)
next, err := dmi.Next()
if err != nil {
return nil, errors.WithMessage(err, "iterate document match iterator")
}
fields := make([]string, 0, len(loadedFields))
for i := range loadedFields {
fields = append(fields, loadedFields[i].Marshal())
}
var hitNumber int
for err == nil && next != nil {
hitNumber = next.HitNumber
var doc index.SeriesDocument
if len(loadedFields) > 0 {
doc.Fields = make(map[string][]byte)
for i := range loadedFields {
doc.Fields[fields[i]] = nil
}
}
var errTime error
err = next.VisitStoredFields(func(field string, value []byte) bool {
switch field {
case docIDField:
doc.Key.EntityValues = value
case timestampField:
var ts time.Time
ts, errTime = bluge.DecodeDateTime(value)
if errTime != nil {
err = errTime
return false
}
doc.Timestamp = ts.UnixNano()
case versionField:
doc.Version = convert.BytesToInt64(value)
default:
if _, ok := doc.Fields[field]; ok {
doc.Fields[field] = bytes.Clone(value)
}
}
return true
})
if err = multierr.Combine(err, errTime); err != nil {
return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber)
}
if len(doc.Key.EntityValues) > 0 {
result = append(result, doc)
}
if limit > 0 && len(result) >= limit {
break
}
next, err = dmi.Next()
}
if err != nil {
return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", hitNumber)
}
return result, nil
}
func (s *store) SeriesSort(ctx context.Context, indexQuery index.Query, orderBy *index.OrderBy,
preLoadSize int, fieldKeys []index.FieldKey,
) (iter index.FieldIterator[*index.DocumentResult], err error) {
var sortedKey string
switch orderBy.Type {
case index.OrderByTypeTime:
sortedKey = timestampField
case index.OrderByTypeIndex:
fieldKey := index.FieldKey{
IndexRuleID: orderBy.Index.Metadata.Id,
}
sortedKey = fieldKey.Marshal()
default:
return nil, errors.Errorf("unsupported order by type: %v", orderBy.Type)
}
if orderBy.Sort == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
}
fields := make([]string, 0, len(fieldKeys))
for i := range fieldKeys {
fields = append(fields, fieldKeys[i].Marshal())
}
if !s.closer.AddRunning() {
return nil, nil
}
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
return &sortIterator{
query: indexQuery,
fields: fields,
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
closer: s.closer,
ctx: ctx,
newIterator: newSeriesIterator,
}, nil
}
type seriesIterator struct {
*blugeMatchIterator
}
func newSeriesIterator(delegated search.DocumentMatchIterator, closer io.Closer,
needToLoadFields []string,
) blugeIterator {
si := &seriesIterator{
blugeMatchIterator: &blugeMatchIterator{
delegated: delegated,
closer: closer,
ctx: search.NewSearchContext(1, 0),
current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))},
},
}
for _, f := range needToLoadFields {
si.current.Values[f] = nil
}
return si
}
func (si *seriesIterator) Next() bool {
var match *search.DocumentMatch
match, si.err = si.delegated.Next()
if si.err != nil {
si.err = errors.WithMessagef(si.err, "failed to get next document, hit: %d", si.hit)
return false
}
if match == nil {
si.err = io.EOF
return false
}
si.hit = match.HitNumber
for i := range si.current.Values {
si.current.Values[i] = nil
}
si.current.DocID = 0
si.current.Timestamp = 0
si.current.SortedValue = nil
if len(match.SortValue) > 0 {
si.current.SortedValue = match.SortValue[0]
}
err := match.VisitStoredFields(si.setVal)
si.err = multierr.Combine(si.err, err)
if si.err != nil {
return false
}
return si.err == nil
}
func (si *seriesIterator) setVal(field string, value []byte) bool {
switch field {
case docIDField:
si.current.EntityValues = value
case timestampField:
ts, errTime := bluge.DecodeDateTime(value)
if errTime != nil {
si.err = errTime
return false
}
si.current.Timestamp = ts.UnixNano()
case versionField:
si.current.Version = convert.BytesToInt64(value)
default:
if _, ok := si.current.Values[field]; ok {
si.current.Values[field] = bytes.Clone(value)
}
}
return true
}
func (s *store) SeriesIterator(ctx context.Context) (index.FieldIterator[index.Series], error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
defer func() {
_ = reader.Close()
}()
dict, err := reader.DictionaryIterator(docIDField, nil, nil, nil)
if err != nil {
return nil, err
}
return &dictIterator{dict: dict, ctx: ctx}, nil
}
type dictIterator struct {
dict segment.DictionaryIterator
ctx context.Context
err error
series index.Series
i int
}
func (d *dictIterator) Next() bool {
if d.err != nil {
return false
}
if d.i%1000 == 0 {
select {
case <-d.ctx.Done():
d.err = d.ctx.Err()
return false
default:
}
}
de, err := d.dict.Next()
if err != nil {
d.err = err
return false
}
if de == nil {
return false
}
d.series = index.Series{
EntityValues: convert.StringToBytes(de.Term()),
}
d.i++
return true
}
func (d *dictIterator) Query() index.Query {
return nil
}
func (d *dictIterator) Val() index.Series {
return d.series
}
func (d *dictIterator) Close() error {
return multierr.Combine(d.err, d.dict.Close())
}