in banyand/internal/storage/index.go [222:299]
func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts,
) (sd SeriesData, sortedValues [][]byte, err error) {
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search")
if opts.Query != nil {
span.Tagf("secondary_query", "%s", opts.Query.String())
}
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
if opts.Order == nil || opts.Order.Index == nil {
sd, err = s.filter(ctx, series, opts.Projection, opts.Query, opts.TimeRange)
if err != nil {
return sd, nil, err
}
return sd, nil, nil
}
var span *query.Span
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "sort")
span.Tagf("preload", "%d", opts.PreloadSize)
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return sd, nil, err
}
}
query, err := s.store.BuildQuery(seriesMatchers, opts.Query, opts.TimeRange)
if err != nil {
return sd, nil, err
}
iter, err := s.store.SeriesSort(ctx, query, opts.Order,
opts.PreloadSize, opts.Projection)
if err != nil {
return sd, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()
var r int
for iter.Next() {
r++
val := iter.Val()
var series pbv1.Series
if err = series.Unmarshal(val.EntityValues); err != nil {
return sd, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues)
}
sd.SeriesList = append(sd.SeriesList, &series)
sd.Timestamps = append(sd.Timestamps, val.Timestamp)
sd.Versions = append(sd.Versions, val.Version)
if len(opts.Projection) > 0 {
sd.Fields = append(sd.Fields, maps.Clone(val.Values))
}
sortedValues = append(sortedValues, val.SortedValue)
}
if span != nil {
span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sd.SeriesList))
}
return sd, sortedValues, err
}