pkg/query/logical/measure/measure_plan_indexscan_local.go (278 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"context"
"fmt"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ logical.UnresolvedPlan = (*unresolvedIndexScan)(nil)
type unresolvedIndexScan struct {
startTime time.Time
endTime time.Time
metadata *commonv1.Metadata
criteria *modelv1.Criteria
projectionTags [][]*logical.Tag
projectionFields []*logical.Field
groupByEntity bool
}
func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) {
projTags := make([]model.TagProjection, len(uis.projectionTags))
var projTagsRefs [][]*logical.TagRef
if len(uis.projectionTags) > 0 {
for i := range uis.projectionTags {
for _, tag := range uis.projectionTags[i] {
projTags[i].Family = tag.GetFamilyName()
projTags[i].Names = append(projTags[i].Names, tag.GetTagName())
}
}
var err error
projTagsRefs, err = s.CreateTagRef(uis.projectionTags...)
if err != nil {
return nil, err
}
}
var projField []string
var projFieldRefs []*logical.FieldRef
if len(uis.projectionFields) > 0 {
for i := range uis.projectionFields {
projField = append(projField, uis.projectionFields[i].Name)
}
var err error
projFieldRefs, err = s.CreateFieldRef(uis.projectionFields...)
if err != nil {
return nil, err
}
}
tr := timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime)
ms := s.(*schema)
if ms.measure.IndexMode {
query, err := inverted.BuildIndexModeQuery(uis.metadata.Name, uis.criteria, s)
if err != nil {
return nil, err
}
return &localIndexScan{
timeRange: tr,
schema: s,
projectionTags: projTags,
projectionFields: projField,
projectionTagsRefs: projTagsRefs,
projectionFieldsRefs: projFieldRefs,
metadata: uis.metadata,
query: query,
groupByEntity: uis.groupByEntity,
uis: uis,
l: logger.GetLogger("query", "measure", uis.metadata.Group, uis.metadata.Name, "local-index"),
}, nil
}
entityList := s.EntityList()
entityMap := make(map[string]int)
entity := make([]*modelv1.TagValue, len(entityList))
for idx, e := range entityList {
entityMap[e] = idx
// fill AnyEntry by default
entity[idx] = pbv1.AnyTagValue
}
query, entities, _, err := inverted.BuildQuery(uis.criteria, s, entityMap, entity)
if err != nil {
return nil, err
}
return &localIndexScan{
timeRange: tr,
schema: s,
projectionTags: projTags,
projectionFields: projField,
projectionTagsRefs: projTagsRefs,
projectionFieldsRefs: projFieldRefs,
metadata: uis.metadata,
query: query,
entities: entities,
groupByEntity: uis.groupByEntity,
uis: uis,
l: logger.GetLogger("query", "measure", uis.metadata.Group, uis.metadata.Name, "local-index"),
}, nil
}
var (
_ logical.Plan = (*localIndexScan)(nil)
_ logical.Sorter = (*localIndexScan)(nil)
)
type localIndexScan struct {
query index.Query
schema logical.Schema
uis *unresolvedIndexScan
order *logical.OrderBy
metadata *commonv1.Metadata
l *logger.Logger
timeRange timestamp.TimeRange
projectionTags []model.TagProjection
projectionTagsRefs [][]*logical.TagRef
projectionFieldsRefs []*logical.FieldRef
entities [][]*modelv1.TagValue
projectionFields []string
groupByEntity bool
}
func (i *localIndexScan) Sort(order *logical.OrderBy) {
i.order = order
}
func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, err error) {
var orderBy *index.OrderBy
if i.order != nil {
orderBy = &index.OrderBy{
Sort: i.order.Sort,
Index: i.order.Index,
}
if orderBy.Index == nil {
orderBy.Type = index.OrderByTypeTime
} else {
orderBy.Type = index.OrderByTypeIndex
}
}
if i.groupByEntity {
if orderBy == nil {
orderBy = &index.OrderBy{}
}
orderBy.Type = index.OrderByTypeSeries
}
ec := executor.FromMeasureExecutionContext(ctx)
ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderBy)
defer stop(err)
result, err := ec.Query(ctx, model.MeasureQueryOptions{
Name: i.metadata.GetName(),
TimeRange: &i.timeRange,
Entities: i.entities,
Query: i.query,
Order: orderBy,
TagProjection: i.projectionTags,
FieldProjection: i.projectionFields,
})
if err != nil {
return nil, fmt.Errorf("failed to query measure: %w", err)
}
return &resultMIterator{
result: result,
}, nil
}
func (i *localIndexScan) String() string {
return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; order=%s;",
i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(),
i.query, logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.order)
}
func (i *localIndexScan) Children() []logical.Plan {
return []logical.Plan{}
}
func (i *localIndexScan) Schema() logical.Schema {
if len(i.projectionTagsRefs) == 0 {
return i.schema
}
return i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
}
func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projectionTags [][]*logical.Tag,
projectionFields []*logical.Field, groupByEntity bool, criteria *modelv1.Criteria,
) logical.UnresolvedPlan {
return &unresolvedIndexScan{
startTime: startTime,
endTime: endTime,
metadata: metadata,
projectionTags: projectionTags,
projectionFields: projectionFields,
groupByEntity: groupByEntity,
criteria: criteria,
}
}
type resultMIterator struct {
result model.MeasureQueryResult
err error
current []*measurev1.DataPoint
i int
}
func (ei *resultMIterator) Next() bool {
if ei.result == nil {
return false
}
ei.i++
if ei.i < len(ei.current) {
return true
}
r := ei.result.Pull()
if r == nil {
return false
}
if r.Error != nil {
ei.err = r.Error
return false
}
ei.current = ei.current[:0]
ei.i = 0
for i := range r.Timestamps {
dp := &measurev1.DataPoint{
Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])),
Sid: uint64(r.SID),
Version: r.Versions[i],
}
for _, tf := range r.TagFamilies {
tagFamily := &modelv1.TagFamily{
Name: tf.Name,
}
dp.TagFamilies = append(dp.TagFamilies, tagFamily)
for _, t := range tf.Tags {
tagFamily.Tags = append(tagFamily.Tags, &modelv1.Tag{
Key: t.Name,
Value: t.Values[i],
})
}
}
for _, f := range r.Fields {
dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{
Name: f.Name,
Value: f.Values[i],
})
}
ei.current = append(ei.current, dp)
}
return true
}
func (ei *resultMIterator) Current() []*measurev1.DataPoint {
return []*measurev1.DataPoint{ei.current[ei.i]}
}
func (ei *resultMIterator) Close() error {
if ei.result != nil {
ei.result.Release()
}
return ei.err
}
func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, orderBy *index.OrderBy) (context.Context, func(error)) {
if tracer == nil {
return ctx, func(error) {}
}
span, ctx := tracer.StartSpan(ctx, "indexScan-%s", i.metadata)
if orderBy != nil {
sortName := modelv1.Sort_name[int32(orderBy.Sort)]
switch orderBy.Type {
case index.OrderByTypeTime:
span.Tag("orderBy", "time-"+sortName)
case index.OrderByTypeIndex:
span.Tag("orderBy", fmt.Sprintf("indexRule:%s-%s", orderBy.Index.Metadata.Name, sortName))
case index.OrderByTypeSeries:
span.Tag("orderBy", "series")
}
} else {
span.Tag("orderBy", "time-asc(default)")
}
span.Tag("details", i.String())
return ctx, func(err error) {
if err != nil {
span.Error(err)
}
span.Stop()
}
}