pkg/query/logical/measure/measure_plan_indexscan_local.go (306 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "context" "fmt" "io" "time" "go.uber.org/multierr" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var _ logical.UnresolvedPlan = (*unresolvedIndexScan)(nil) type unresolvedIndexScan struct { startTime time.Time endTime time.Time metadata *commonv1.Metadata criteria *modelv1.Criteria projectionTags [][]*logical.Tag projectionFields []*logical.Field groupByEntity bool } func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) { var projTagsRefs [][]*logical.TagRef if len(uis.projectionTags) > 0 { var err error projTagsRefs, err = s.CreateTagRef(uis.projectionTags...) if err != nil { return nil, err } } var projFieldRefs []*logical.FieldRef if len(uis.projectionFields) > 0 { var err error projFieldRefs, err = s.CreateFieldRef(uis.projectionFields...) if err != nil { return nil, err } } entityList := s.EntityList() entityMap := make(map[string]int) entity := make([]tsdb.Entry, len(entityList)) for idx, e := range entityList { entityMap[e] = idx // fill AnyEntry by default entity[idx] = tsdb.AnyEntry } filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity) if err != nil { return nil, err } return &localIndexScan{ timeRange: timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime), schema: s, projectionTagsRefs: projTagsRefs, projectionFieldsRefs: projFieldRefs, metadata: uis.metadata, filter: filter, entities: entities, groupByEntity: uis.groupByEntity, l: logger.GetLogger("query", "measure", uis.metadata.Group, uis.metadata.Name, "local-index"), }, nil } var ( _ logical.Plan = (*localIndexScan)(nil) _ logical.Sorter = (*localIndexScan)(nil) _ logical.VolumeLimiter = (*localIndexScan)(nil) ) type localIndexScan struct { schema logical.Schema filter index.Filter order *logical.OrderBy metadata *commonv1.Metadata l *logger.Logger timeRange timestamp.TimeRange projectionTagsRefs [][]*logical.TagRef projectionFieldsRefs []*logical.FieldRef entities []tsdb.Entity groupByEntity bool maxDataPointsSize int } func (i *localIndexScan) Limit(max int) { i.maxDataPointsSize = max } func (i *localIndexScan) Sort(order *logical.OrderBy) { i.order = order } func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) { var orderBy *tsdb.OrderBy if i.order.Index != nil { orderBy = &tsdb.OrderBy{ Index: i.order.Index, Sort: i.order.Sort, } } var seriesList tsdb.SeriesList for _, e := range i.entities { shards, errInternal := ec.Shards(e) if errInternal != nil { return nil, errInternal } for _, shard := range shards { sl, errInternal := shard.Series().Search( context.WithValue( context.Background(), logger.ContextKey, i.l, ), tsdb.NewPath(e), i.filter, orderBy, ) if errInternal != nil { return nil, errInternal } seriesList = seriesList.Merge(sl) } } if len(seriesList) == 0 { return dummyIter, nil } var builders []logical.SeekerBuilder if i.order.Index == nil { builders = append(builders, func(builder tsdb.SeekerBuilder) { builder.OrderByTime(i.order.Sort) }) } // CAVEAT: the order of series list matters when sorting by an index. iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...) if err != nil { return nil, err } if len(closers) > 0 { defer func(closers []io.Closer) { for _, c := range closers { err = multierr.Append(err, c.Close()) } }(closers) } if len(iters) == 0 { return dummyIter, nil } transformContext := transformContext{ ec: ec, projectionTagsRefs: i.projectionTagsRefs, projectionFieldsRefs: i.projectionFieldsRefs, } if i.groupByEntity { return newSeriesMIterator(iters, transformContext, i.maxDataPointsSize), nil } it := logical.NewItemIter(iters, i.order.Sort) return newIndexScanIterator(it, transformContext, i.maxDataPointsSize), nil } func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; order=%s; limit=%d", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), i.filter, logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.order, i.maxDataPointsSize) } func (i *localIndexScan) Children() []logical.Plan { return []logical.Plan{} } func (i *localIndexScan) Schema() logical.Schema { if len(i.projectionTagsRefs) == 0 { return i.schema } return i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...) } func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projectionTags [][]*logical.Tag, projectionFields []*logical.Field, groupByEntity bool, criteria *modelv1.Criteria, ) logical.UnresolvedPlan { return &unresolvedIndexScan{ startTime: startTime, endTime: endTime, metadata: metadata, projectionTags: projectionTags, projectionFields: projectionFields, groupByEntity: groupByEntity, criteria: criteria, } } var _ executor.MIterator = (*indexScanIterator)(nil) type indexScanIterator struct { inner logical.ItemIterator err error current *measurev1.DataPoint context transformContext max int num int } func newIndexScanIterator(inner logical.ItemIterator, context transformContext, max int) executor.MIterator { return &indexScanIterator{ inner: inner, context: context, max: max, } } func (ism *indexScanIterator) Next() bool { if !ism.inner.HasNext() || ism.err != nil || ism.num > ism.max { return false } nextItem := ism.inner.Next() var err error if ism.current, err = transform(nextItem, ism.context); err != nil { ism.err = multierr.Append(ism.err, err) } ism.num++ return true } func (ism *indexScanIterator) Current() []*measurev1.DataPoint { if ism.current == nil { return nil } return []*measurev1.DataPoint{ism.current} } func (ism *indexScanIterator) Close() error { return multierr.Combine(ism.err, ism.inner.Close()) } var _ executor.MIterator = (*seriesIterator)(nil) type seriesIterator struct { err error context transformContext inner []tsdb.Iterator current []*measurev1.DataPoint index int num int max int } func newSeriesMIterator(inner []tsdb.Iterator, context transformContext, max int) executor.MIterator { return &seriesIterator{ inner: inner, context: context, index: -1, max: max, } } func (ism *seriesIterator) Next() bool { if ism.err != nil || ism.num > ism.max { return false } ism.index++ if ism.index >= len(ism.inner) { return false } iter := ism.inner[ism.index] if ism.current != nil { ism.current = ism.current[:0] } for iter.Next() { dp, err := transform(iter.Val(), ism.context) if err != nil { ism.err = err return false } ism.current = append(ism.current, dp) } ism.num++ return true } func (ism *seriesIterator) Current() []*measurev1.DataPoint { return ism.current } func (ism *seriesIterator) Close() error { for _, i := range ism.inner { ism.err = multierr.Append(ism.err, i.Close()) } return ism.err } type transformContext struct { ec executor.MeasureExecutionContext projectionTagsRefs [][]*logical.TagRef projectionFieldsRefs []*logical.FieldRef } func transform(item tsdb.Item, ism transformContext) (*measurev1.DataPoint, error) { tagFamilies, err := logical.ProjectItem(ism.ec, item, ism.projectionTagsRefs) if err != nil { return nil, err } dpFields := make([]*measurev1.DataPoint_Field, 0) for _, f := range ism.projectionFieldsRefs { fieldVal, parserFieldErr := ism.ec.ParseField(f.Field.Name, item) if parserFieldErr != nil { return nil, parserFieldErr } dpFields = append(dpFields, fieldVal) } return &measurev1.DataPoint{ Fields: dpFields, TagFamilies: tagFamilies, Timestamp: timestamppb.New(time.Unix(0, int64(item.Time()))), }, nil } var dummyIter = dummyMIterator{} type dummyMIterator struct{} func (ei dummyMIterator) Next() bool { return false } func (ei dummyMIterator) Current() []*measurev1.DataPoint { return nil } func (ei dummyMIterator) Close() error { return nil }