pkg/query/logical/measure/measure_plan_indexscan_local.go (306 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
	"context"
	"fmt"
	"io"
	"time"
	"go.uber.org/multierr"
	"google.golang.org/protobuf/types/known/timestamppb"
	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
	"github.com/apache/skywalking-banyandb/banyand/tsdb"
	"github.com/apache/skywalking-banyandb/pkg/index"
	"github.com/apache/skywalking-banyandb/pkg/logger"
	"github.com/apache/skywalking-banyandb/pkg/query/executor"
	"github.com/apache/skywalking-banyandb/pkg/query/logical"
	"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ logical.UnresolvedPlan = (*unresolvedIndexScan)(nil)
type unresolvedIndexScan struct {
	startTime        time.Time
	endTime          time.Time
	metadata         *commonv1.Metadata
	criteria         *modelv1.Criteria
	projectionTags   [][]*logical.Tag
	projectionFields []*logical.Field
	groupByEntity    bool
}
func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) {
	var projTagsRefs [][]*logical.TagRef
	if len(uis.projectionTags) > 0 {
		var err error
		projTagsRefs, err = s.CreateTagRef(uis.projectionTags...)
		if err != nil {
			return nil, err
		}
	}
	var projFieldRefs []*logical.FieldRef
	if len(uis.projectionFields) > 0 {
		var err error
		projFieldRefs, err = s.CreateFieldRef(uis.projectionFields...)
		if err != nil {
			return nil, err
		}
	}
	entityList := s.EntityList()
	entityMap := make(map[string]int)
	entity := make([]tsdb.Entry, len(entityList))
	for idx, e := range entityList {
		entityMap[e] = idx
		// fill AnyEntry by default
		entity[idx] = tsdb.AnyEntry
	}
	filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity)
	if err != nil {
		return nil, err
	}
	return &localIndexScan{
		timeRange:            timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime),
		schema:               s,
		projectionTagsRefs:   projTagsRefs,
		projectionFieldsRefs: projFieldRefs,
		metadata:             uis.metadata,
		filter:               filter,
		entities:             entities,
		groupByEntity:        uis.groupByEntity,
		l:                    logger.GetLogger("query", "measure", uis.metadata.Group, uis.metadata.Name, "local-index"),
	}, nil
}
var (
	_ logical.Plan          = (*localIndexScan)(nil)
	_ logical.Sorter        = (*localIndexScan)(nil)
	_ logical.VolumeLimiter = (*localIndexScan)(nil)
)
type localIndexScan struct {
	schema               logical.Schema
	filter               index.Filter
	order                *logical.OrderBy
	metadata             *commonv1.Metadata
	l                    *logger.Logger
	timeRange            timestamp.TimeRange
	projectionTagsRefs   [][]*logical.TagRef
	projectionFieldsRefs []*logical.FieldRef
	entities             []tsdb.Entity
	groupByEntity        bool
	maxDataPointsSize    int
}
func (i *localIndexScan) Limit(max int) {
	i.maxDataPointsSize = max
}
func (i *localIndexScan) Sort(order *logical.OrderBy) {
	i.order = order
}
func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) {
	var orderBy *tsdb.OrderBy
	if i.order.Index != nil {
		orderBy = &tsdb.OrderBy{
			Index: i.order.Index,
			Sort:  i.order.Sort,
		}
	}
	var seriesList tsdb.SeriesList
	for _, e := range i.entities {
		shards, errInternal := ec.Shards(e)
		if errInternal != nil {
			return nil, errInternal
		}
		for _, shard := range shards {
			sl, errInternal := shard.Series().Search(
				context.WithValue(
					context.Background(),
					logger.ContextKey,
					i.l,
				),
				tsdb.NewPath(e),
				i.filter,
				orderBy,
			)
			if errInternal != nil {
				return nil, errInternal
			}
			seriesList = seriesList.Merge(sl)
		}
	}
	if len(seriesList) == 0 {
		return dummyIter, nil
	}
	var builders []logical.SeekerBuilder
	if i.order.Index == nil {
		builders = append(builders, func(builder tsdb.SeekerBuilder) {
			builder.OrderByTime(i.order.Sort)
		})
	}
	// CAVEAT: the order of series list matters when sorting by an index.
	iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...)
	if err != nil {
		return nil, err
	}
	if len(closers) > 0 {
		defer func(closers []io.Closer) {
			for _, c := range closers {
				err = multierr.Append(err, c.Close())
			}
		}(closers)
	}
	if len(iters) == 0 {
		return dummyIter, nil
	}
	transformContext := transformContext{
		ec:                   ec,
		projectionTagsRefs:   i.projectionTagsRefs,
		projectionFieldsRefs: i.projectionFieldsRefs,
	}
	if i.groupByEntity {
		return newSeriesMIterator(iters, transformContext, i.maxDataPointsSize), nil
	}
	it := logical.NewItemIter(iters, i.order.Sort)
	return newIndexScanIterator(it, transformContext, i.maxDataPointsSize), nil
}
func (i *localIndexScan) String() string {
	return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; order=%s; limit=%d",
		i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(),
		i.filter, logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.order, i.maxDataPointsSize)
}
func (i *localIndexScan) Children() []logical.Plan {
	return []logical.Plan{}
}
func (i *localIndexScan) Schema() logical.Schema {
	if len(i.projectionTagsRefs) == 0 {
		return i.schema
	}
	return i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
}
func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projectionTags [][]*logical.Tag,
	projectionFields []*logical.Field, groupByEntity bool, criteria *modelv1.Criteria,
) logical.UnresolvedPlan {
	return &unresolvedIndexScan{
		startTime:        startTime,
		endTime:          endTime,
		metadata:         metadata,
		projectionTags:   projectionTags,
		projectionFields: projectionFields,
		groupByEntity:    groupByEntity,
		criteria:         criteria,
	}
}
var _ executor.MIterator = (*indexScanIterator)(nil)
type indexScanIterator struct {
	inner   logical.ItemIterator
	err     error
	current *measurev1.DataPoint
	context transformContext
	max     int
	num     int
}
func newIndexScanIterator(inner logical.ItemIterator, context transformContext, max int) executor.MIterator {
	return &indexScanIterator{
		inner:   inner,
		context: context,
		max:     max,
	}
}
func (ism *indexScanIterator) Next() bool {
	if !ism.inner.HasNext() || ism.err != nil || ism.num > ism.max {
		return false
	}
	nextItem := ism.inner.Next()
	var err error
	if ism.current, err = transform(nextItem, ism.context); err != nil {
		ism.err = multierr.Append(ism.err, err)
	}
	ism.num++
	return true
}
func (ism *indexScanIterator) Current() []*measurev1.DataPoint {
	if ism.current == nil {
		return nil
	}
	return []*measurev1.DataPoint{ism.current}
}
func (ism *indexScanIterator) Close() error {
	return multierr.Combine(ism.err, ism.inner.Close())
}
var _ executor.MIterator = (*seriesIterator)(nil)
type seriesIterator struct {
	err     error
	context transformContext
	inner   []tsdb.Iterator
	current []*measurev1.DataPoint
	index   int
	num     int
	max     int
}
func newSeriesMIterator(inner []tsdb.Iterator, context transformContext, max int) executor.MIterator {
	return &seriesIterator{
		inner:   inner,
		context: context,
		index:   -1,
		max:     max,
	}
}
func (ism *seriesIterator) Next() bool {
	if ism.err != nil || ism.num > ism.max {
		return false
	}
	ism.index++
	if ism.index >= len(ism.inner) {
		return false
	}
	iter := ism.inner[ism.index]
	if ism.current != nil {
		ism.current = ism.current[:0]
	}
	for iter.Next() {
		dp, err := transform(iter.Val(), ism.context)
		if err != nil {
			ism.err = err
			return false
		}
		ism.current = append(ism.current, dp)
	}
	ism.num++
	return true
}
func (ism *seriesIterator) Current() []*measurev1.DataPoint {
	return ism.current
}
func (ism *seriesIterator) Close() error {
	for _, i := range ism.inner {
		ism.err = multierr.Append(ism.err, i.Close())
	}
	return ism.err
}
type transformContext struct {
	ec                   executor.MeasureExecutionContext
	projectionTagsRefs   [][]*logical.TagRef
	projectionFieldsRefs []*logical.FieldRef
}
func transform(item tsdb.Item, ism transformContext) (*measurev1.DataPoint, error) {
	tagFamilies, err := logical.ProjectItem(ism.ec, item, ism.projectionTagsRefs)
	if err != nil {
		return nil, err
	}
	dpFields := make([]*measurev1.DataPoint_Field, 0)
	for _, f := range ism.projectionFieldsRefs {
		fieldVal, parserFieldErr := ism.ec.ParseField(f.Field.Name, item)
		if parserFieldErr != nil {
			return nil, parserFieldErr
		}
		dpFields = append(dpFields, fieldVal)
	}
	return &measurev1.DataPoint{
		Fields:      dpFields,
		TagFamilies: tagFamilies,
		Timestamp:   timestamppb.New(time.Unix(0, int64(item.Time()))),
	}, nil
}
var dummyIter = dummyMIterator{}
type dummyMIterator struct{}
func (ei dummyMIterator) Next() bool {
	return false
}
func (ei dummyMIterator) Current() []*measurev1.DataPoint {
	return nil
}
func (ei dummyMIterator) Close() error {
	return nil
}