pkg/index/lsm/iterator.go (179 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package lsm import ( "bytes" "math" "sync" "go.uber.org/multierr" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) type compositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) var ( _ index.FieldIterator = (*fieldIteratorTemplate)(nil) defaultUpper = convert.Uint64ToBytes(math.MaxUint64) defaultLower = convert.Uint64ToBytes(0) ) type fieldIteratorTemplate struct { err error delegated *delegateIterator cur *index.PostingValue fn compositePostingValueFn closer *run.Closer seekKey []byte termRange index.RangeOpts closeOnce sync.Once init bool reverse bool } func (f *fieldIteratorTemplate) Next() bool { if !f.init { f.init = true f.delegated.Seek(f.seekKey) } if !f.delegated.Valid() { return false } pv, err := f.fn(f.delegated.Field().Term, f.delegated.Val(), f.delegated) if err != nil { f.err = err return false } in := f.termRange.Between(pv.Term) switch { case in > 0: if f.reverse { return f.Next() } return false case in < 0: if f.reverse { return false } return f.Next() } f.cur = pv return true } func (f *fieldIteratorTemplate) Val() *index.PostingValue { return f.cur } func (f *fieldIteratorTemplate) Close() (err error) { f.closeOnce.Do(func() { defer f.closer.Done() err = multierr.Combine(f.err, f.delegated.Close()) }) return err } func newFieldIteratorTemplate(l *logger.Logger, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, iterable kv.Iterable, closer *run.Closer, fn compositePostingValueFn, ) *fieldIteratorTemplate { if termRange.Upper == nil { termRange.Upper = defaultUpper } if termRange.Lower == nil { termRange.Lower = defaultLower } var reverse bool var term []byte switch order { case modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_UNSPECIFIED: term = termRange.Lower reverse = false case modelv1.Sort_SORT_DESC: term = termRange.Upper reverse = true } iter := iterable.NewIterator(kv.ScanOpts{ Prefix: fieldKey.Marshal(), Reverse: reverse, }) field := index.Field{ Key: fieldKey, Term: term, } return &fieldIteratorTemplate{ delegated: newDelegateIterator(iter, fieldKey, l), termRange: termRange, fn: fn, reverse: reverse, seekKey: field.Marshal(), closer: closer, } } func parseKey(fieldKey index.FieldKey, key []byte) (index.Field, error) { f := &index.Field{ Key: fieldKey, } err := f.Unmarshal(key) if err != nil { return *f, err } return *f, nil } var _ kv.Iterator = (*delegateIterator)(nil) type delegateIterator struct { delegated kv.Iterator l *logger.Logger fieldKeyBytes []byte curField index.Field fieldKey index.FieldKey closed bool } func newDelegateIterator(delegated kv.Iterator, fieldKey index.FieldKey, l *logger.Logger) *delegateIterator { fieldKeyBytes := fieldKey.Marshal() return &delegateIterator{ delegated: delegated, fieldKey: fieldKey, fieldKeyBytes: fieldKeyBytes, l: l, } } func (di *delegateIterator) Next() { di.delegated.Next() } func (di *delegateIterator) Rewind() { di.delegated.Rewind() } func (di *delegateIterator) Seek(key []byte) { di.delegated.Seek(key) } func (di *delegateIterator) Key() []byte { return di.delegated.Key() } func (di *delegateIterator) RawKey() []byte { return di.delegated.RawKey() } func (di *delegateIterator) Field() index.Field { return di.curField } func (di *delegateIterator) Val() []byte { return di.delegated.Val() } func (di *delegateIterator) Valid() bool { if di.closed || !di.delegated.Valid() { return false } var err error di.curField, err = parseKey(di.fieldKey, di.Key()) if err != nil { di.l.Error().Err(err).Msg("fail to parse field from key") return false } if !bytes.Equal(di.curField.Key.Marshal(), di.fieldKeyBytes) { if e := di.l.Debug(); e.Enabled() { e.Uint64("series_id", uint64(di.fieldKey.SeriesID)). Uint32("index_rule_id", di.fieldKey.IndexRuleID). Msg("reached the limitation of the field(series_id+index_rule_id)") } return false } return true } func (di *delegateIterator) Close() error { di.closed = true return di.delegated.Close() }