banyand/tsdb/series_seek_sort.go (228 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package tsdb import ( "sort" "time" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" ) var ( errUnspecifiedIndexType = errors.New("Unspecified index type") emptyFilters = make([]filterFn, 0) rangeOpts = index.RangeOpts{} ) func (s *seekerBuilder) OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder { s.indexRuleForSorting = indexRule s.order = order return s } func (s *seekerBuilder) OrderByTime(order modelv1.Sort) SeekerBuilder { s.order = order s.indexRuleForSorting = nil return s } func (s *seekerBuilder) buildSeries() ([]Iterator, error) { if s.indexRuleForSorting == nil { return s.buildSeriesByTime() } return s.buildSeriesByIndex() } func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { timeFilter := func(item Item) bool { valid := s.seriesSpan.timeRange.Contains(item.Time()) timeRange := s.seriesSpan.timeRange s.seriesSpan.l.Trace(). Times("time_range", []time.Time{timeRange.Start, timeRange.End}). Bool("valid", valid).Msg("filter item by time range") return valid } for _, b := range s.seriesSpan.blocks { var inner index.FieldIterator var err error fieldKey := index.FieldKey{ SeriesID: s.seriesSpan.seriesID, IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(), } filters := []filterFn{timeFilter} filter, err := s.buildIndexFilter(b) if err != nil { return nil, err } if filter != nil { filters = append(filters, filter) } switch s.indexRuleForSorting.GetType() { case databasev1.IndexRule_TYPE_TREE: inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order) case databasev1.IndexRule_TYPE_INVERTED: inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order) case databasev1.IndexRule_TYPE_UNSPECIFIED: return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) } if err != nil { return nil, err } if inner != nil { series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters)) } } return } func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) { bb := s.seriesSpan.blocks switch s.order { case modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_UNSPECIFIED: sort.SliceStable(bb, func(i, j int) bool { return bb[i].startTime().Before(bb[j].startTime()) }) case modelv1.Sort_SORT_DESC: sort.SliceStable(bb, func(i, j int) bool { return bb[i].startTime().After(bb[j].startTime()) }) } delegated := make([]Iterator, 0, len(bb)) bTimes := make([]time.Time, 0, len(bb)) timeRange := s.seriesSpan.timeRange termRange := index.RangeOpts{ Lower: convert.Int64ToBytes(timeRange.Start.UnixNano()), Upper: convert.Int64ToBytes(timeRange.End.UnixNano()), IncludesLower: true, } for _, b := range bb { bTimes = append(bTimes, b.startTime()) inner, err := b.primaryIndexReader(). Iterator( index.FieldKey{ SeriesID: s.seriesSpan.seriesID, }, termRange, s.order, ) if err != nil { return nil, err } if inner != nil { filter, err := s.buildIndexFilter(b) if err != nil { return nil, err } if filter == nil { delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters)) } else { delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter})) } } } if e := s.seriesSpan.l.Debug(); e.Enabled() { e.Str("order", modelv1.Sort_name[int32(s.order)]). Times("blocks", bTimes). Uint64("series_id", uint64(s.seriesSpan.seriesID)). Str("series", s.seriesSpan.series). Int("shard_id", int(s.seriesSpan.shardID)). Msg("seek series by time") } return []Iterator{newMergedIterator(delegated)}, nil } var _ Iterator = (*searcherIterator)(nil) type searcherIterator struct { fieldIterator index.FieldIterator cur posting.Iterator data kv.TimeSeriesReader l *logger.Logger curKey []byte filters []filterFn seriesID common.SeriesID } func (s *searcherIterator) Next() bool { if s.cur == nil { if s.fieldIterator.Next() { v := s.fieldIterator.Val() s.cur = v.Value.Iterator() s.curKey = v.Term } else { return false } } if s.cur.Next() { for _, filter := range s.filters { if !filter(s.Val()) { return s.Next() } } if e := s.l.Debug(); e.Enabled() { e.Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", uint64(s.Val().ID())).Msg("got an item") } return true } s.cur = nil return s.Next() } func (s *searcherIterator) Val() Item { return &item{ sortedField: s.curKey, itemID: common.ItemID(s.cur.Current()), data: s.data, seriesID: s.seriesID, } } func (s *searcherIterator) Close() error { return s.fieldIterator.Close() } func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn, ) Iterator { return &searcherIterator{ fieldIterator: fieldIterator, data: data, seriesID: seriesID, filters: filters, l: l, } } var _ Iterator = (*mergedIterator)(nil) type mergedIterator struct { curr Iterator delegated []Iterator index int } func (m *mergedIterator) Next() bool { if m.curr == nil { m.index++ if m.index >= len(m.delegated) { return false } m.curr = m.delegated[m.index] } hasNext := m.curr.Next() if !hasNext { m.curr = nil return m.Next() } return true } func (m *mergedIterator) Val() Item { return m.curr.Val() } func (m *mergedIterator) Close() error { var err error for _, d := range m.delegated { err = multierr.Append(err, d.Close()) } return err } func newMergedIterator(delegated []Iterator) Iterator { return &mergedIterator{ index: -1, delegated: delegated, } }