banyand/internal/storage/index.go (329 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package storage import ( "context" "maps" "path" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const seriesIndexDirName = "sidx" func (s *segment[T, O]) IndexDB() IndexDB { return s.index } func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { sl, err := s.index.filter(ctx, series, nil, nil, nil) return sl.SeriesList, err } type seriesIndex struct { store index.SeriesStore l *logger.Logger metrics *inverted.Metrics p common.Position } func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64, cacheMaxBytes int, metrics *inverted.Metrics, ) (*seriesIndex, error) { si := &seriesIndex{ l: logger.Fetch(ctx, "series_index"), p: common.GetPosition(ctx), } opts := inverted.StoreOpts{ Path: path.Join(root, seriesIndexDirName), Logger: si.l, BatchWaitSec: flushTimeoutSeconds, CacheMaxBytes: cacheMaxBytes, } if metrics != nil { opts.Metrics = metrics si.metrics = opts.Metrics } var err error if si.store, err = inverted.NewStore(opts); err != nil { return nil, err } return si, nil } func (s *seriesIndex) Insert(docs index.Documents) error { return s.store.InsertSeriesBatch(index.Batch{ Documents: docs, }) } func (s *seriesIndex) Update(docs index.Documents) error { return s.store.UpdateSeriesBatch(index.Batch{ Documents: docs, }) } func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey, secondaryQuery index.Query, timeRange *timestamp.TimeRange, ) (data SeriesData, err error) { if len(series) == 0 && secondaryQuery == nil { return data, nil } var seriesMatchers []index.SeriesMatcher if len(series) > 0 { seriesMatchers = make([]index.SeriesMatcher, len(series)) for i := range series { seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) if err != nil { return SeriesData{}, err } } } indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery, timeRange) if err != nil { return SeriesData{}, err } tracer := query.GetTracer(ctx) if tracer != nil { span, _ := tracer.StartSpan(ctx, "seriesIndex.search") span.Tagf("query", "%s", indexQuery.String()) defer func() { span.Tagf("matched", "%d", len(data.SeriesList)) if len(data.Fields) > 0 { span.Tagf("field_length", "%d", len(data.Fields[0])) } if err != nil { span.Error(err) } span.Stop() }() } ss, err := s.store.Search(ctx, projection, indexQuery, 0) if err != nil { return SeriesData{}, err } if len(ss) == 0 { return SeriesData{}, nil } data.SeriesList, data.Fields, data.Timestamps, data.Versions, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0) if err != nil { return SeriesData{}, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss)) } return data, nil } var emptySeriesMatcher = index.SeriesMatcher{} func convertEntityValuesToSeriesMatcher(series *pbv1.Series) (index.SeriesMatcher, error) { var hasAny, hasWildcard bool var prefixIndex int var localSeries pbv1.Series series.CopyTo(&localSeries) for i, tv := range localSeries.EntityValues { if tv == nil { return emptySeriesMatcher, errors.New("unexpected nil tag value") } if tv == pbv1.AnyTagValue { if !hasAny { hasAny = true prefixIndex = i } continue } if hasAny { hasWildcard = true break } } var err error if hasAny { if hasWildcard { if err = localSeries.MarshalWithWildcard(); err != nil { return emptySeriesMatcher, err } return index.SeriesMatcher{ Type: index.SeriesMatcherTypeWildcard, Match: localSeries.Buffer, }, nil } localSeries.EntityValues = localSeries.EntityValues[:prefixIndex] if err = localSeries.Marshal(); err != nil { return emptySeriesMatcher, err } return index.SeriesMatcher{ Type: index.SeriesMatcherTypePrefix, Match: localSeries.Buffer, }, nil } if err = localSeries.Marshal(); err != nil { return emptySeriesMatcher, err } return index.SeriesMatcher{ Type: index.SeriesMatcherTypeExact, Match: localSeries.Buffer, }, nil } func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasFields bool) (pbv1.SeriesList, FieldResultList, []int64, []int64, error) { seriesList := make(pbv1.SeriesList, 0, len(indexSeries)) var fields FieldResultList if hasFields { fields = make(FieldResultList, 0, len(indexSeries)) } var timestamps, versions []int64 for _, s := range indexSeries { var series pbv1.Series if err := series.Unmarshal(s.Key.EntityValues); err != nil { return nil, nil, nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.Key.EntityValues) } seriesList = append(seriesList, &series) if fields != nil { fields = append(fields, s.Fields) } if s.Timestamp > 0 { timestamps = append(timestamps, s.Timestamp) } if s.Version > 0 { versions = append(versions, s.Version) } } return seriesList, fields, timestamps, versions, nil } func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts, ) (sd SeriesData, sortedValues [][]byte, err error) { tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search") if opts.Query != nil { span.Tagf("secondary_query", "%s", opts.Query.String()) } defer func() { if err != nil { span.Error(err) } span.Stop() }() } if opts.Order == nil || opts.Order.Index == nil { sd, err = s.filter(ctx, series, opts.Projection, opts.Query, opts.TimeRange) if err != nil { return sd, nil, err } return sd, nil, nil } var span *query.Span if tracer != nil { span, _ = tracer.StartSpan(ctx, "sort") span.Tagf("preload", "%d", opts.PreloadSize) defer func() { if err != nil { span.Error(err) } span.Stop() }() } seriesMatchers := make([]index.SeriesMatcher, len(series)) for i := range series { seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) if err != nil { return sd, nil, err } } query, err := s.store.BuildQuery(seriesMatchers, opts.Query, opts.TimeRange) if err != nil { return sd, nil, err } iter, err := s.store.SeriesSort(ctx, query, opts.Order, opts.PreloadSize, opts.Projection) if err != nil { return sd, nil, err } defer func() { err = multierr.Append(err, iter.Close()) }() var r int for iter.Next() { r++ val := iter.Val() var series pbv1.Series if err = series.Unmarshal(val.EntityValues); err != nil { return sd, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues) } sd.SeriesList = append(sd.SeriesList, &series) sd.Timestamps = append(sd.Timestamps, val.Timestamp) sd.Versions = append(sd.Versions, val.Version) if len(opts.Projection) > 0 { sd.Fields = append(sd.Fields, maps.Clone(val.Values)) } sortedValues = append(sortedValues, val.SortedValue) } if span != nil { span.Tagf("query", "%s", iter.Query().String()) span.Tagf("rounds", "%d", r) span.Tagf("size", "%d", len(sd.SeriesList)) } return sd, sortedValues, err } func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error) { tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span span, ctx = tracer.StartSpan(ctx, "seriesIndex.SearchWithoutSeries") if opts.Query != nil { span.Tagf("secondary_query", "%s", opts.Query.String()) } defer func() { if err != nil { span.Error(err) } span.Stop() }() } if opts.Order == nil || opts.Order.Index == nil { sd, err = s.filter(ctx, nil, opts.Projection, opts.Query, opts.TimeRange) if err != nil { return sd, nil, err } return sd, nil, nil } var span *query.Span if tracer != nil { span, _ = tracer.StartSpan(ctx, "sort") span.Tagf("preload", "%d", opts.PreloadSize) defer func() { if err != nil { span.Error(err) } span.Stop() }() } iter, err := s.store.SeriesSort(ctx, opts.Query, opts.Order, opts.PreloadSize, opts.Projection) if err != nil { return sd, nil, err } defer func() { err = multierr.Append(err, iter.Close()) }() var r int for iter.Next() { r++ val := iter.Val() var series pbv1.Series if err = series.Unmarshal(val.EntityValues); err != nil { return sd, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues) } sd.SeriesList = append(sd.SeriesList, &series) sd.Timestamps = append(sd.Timestamps, val.Timestamp) sd.Versions = append(sd.Versions, val.Version) if len(opts.Projection) > 0 { sd.Fields = append(sd.Fields, maps.Clone(val.Values)) } sortedValues = append(sortedValues, val.SortedValue) } if span != nil { span.Tagf("query", "%s", iter.Query().String()) span.Tagf("rounds", "%d", r) span.Tagf("size", "%d", len(sd.SeriesList)) } return sd, sortedValues, err } func (s *seriesIndex) Close() error { s.metrics.DeleteAll(s.p.SegLabelValues()...) return s.store.Close() }