pkg/query/logical/measure/measure_analyzer.go (130 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "context" "math" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) const defaultLimit uint32 = 100 // BuildSchema returns Schema loaded from the metadata repository. func BuildSchema(md *databasev1.Measure, indexRules []*databasev1.IndexRule) (logical.Schema, error) { md.GetEntity() ms := &schema{ common: &logical.CommonSchema{ IndexRules: indexRules, TagSpecMap: make(map[string]*logical.TagSpec), EntityList: md.GetEntity().GetTagNames(), }, measure: md, fieldMap: make(map[string]*logical.FieldSpec), } ms.common.RegisterTagFamilies(md.GetTagFamilies()) for fieldIdx, spec := range md.GetFields() { ms.registerField(fieldIdx, spec) } return ms, nil } // Analyze converts logical expressions to executable operation tree represented by Plan. func Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) { groupByEntity := false var groupByTags [][]*logical.Tag if criteria.GetGroupBy() != nil { groupByProjectionTags := criteria.GetGroupBy().GetTagProjection() groupByTags = make([][]*logical.Tag, len(groupByProjectionTags.GetTagFamilies())) tags := make([]string, 0) for i, tagFamily := range groupByProjectionTags.GetTagFamilies() { groupByTags[i] = logical.NewTags(tagFamily.GetName(), tagFamily.GetTags()...) tags = append(tags, tagFamily.GetTags()...) } if logical.StringSlicesEqual(s.EntityList(), tags) { groupByEntity = true } } // parse fields plan := parseFields(criteria, metadata, groupByEntity) // parse limit and offset limitParameter := criteria.GetLimit() if limitParameter == 0 { limitParameter = defaultLimit } pushedLimit := int(limitParameter + criteria.GetOffset()) if criteria.GetGroupBy() != nil { plan = newUnresolvedGroupBy(plan, groupByTags, groupByEntity) pushedLimit = math.MaxInt } if criteria.GetAgg() != nil { plan = newUnresolvedAggregation(plan, logical.NewField(criteria.GetAgg().GetFieldName()), criteria.GetAgg().GetFunction(), criteria.GetGroupBy() != nil, ) pushedLimit = math.MaxInt } if criteria.GetTop() != nil { plan = top(plan, criteria.GetTop()) } plan = limit(plan, criteria.GetOffset(), limitParameter) p, err := plan.Analyze(s) if err != nil { return nil, err } rules := []logical.OptimizeRule{ logical.NewPushDownOrder(criteria.OrderBy), logical.NewPushDownMaxSize(pushedLimit), } if err := logical.ApplyRules(p, rules...); err != nil { return nil, err } return p, nil } // DistributedAnalyze converts logical expressions to executable operation tree represented by Plan. func DistributedAnalyze(criteria *measurev1.QueryRequest, s logical.Schema) (logical.Plan, error) { var groupByTags [][]*logical.Tag if criteria.GetGroupBy() != nil { groupByProjectionTags := criteria.GetGroupBy().GetTagProjection() groupByTags = make([][]*logical.Tag, len(groupByProjectionTags.GetTagFamilies())) for i, tagFamily := range groupByProjectionTags.GetTagFamilies() { groupByTags[i] = logical.NewTags(tagFamily.GetName(), tagFamily.GetTags()...) } } // parse fields plan := newUnresolvedDistributed(criteria) // parse limit and offset limitParameter := criteria.GetLimit() if limitParameter == 0 { limitParameter = defaultLimit } pushedLimit := int(limitParameter + criteria.GetOffset()) if criteria.GetGroupBy() != nil { plan = newUnresolvedGroupBy(plan, groupByTags, false) pushedLimit = math.MaxInt } if criteria.GetAgg() != nil { plan = newUnresolvedAggregation(plan, logical.NewField(criteria.GetAgg().GetFieldName()), criteria.GetAgg().GetFunction(), criteria.GetGroupBy() != nil, ) pushedLimit = math.MaxInt } if criteria.GetTop() != nil { plan = top(plan, criteria.GetTop()) } plan = limit(plan, criteria.GetOffset(), limitParameter) p, err := plan.Analyze(s) if err != nil { return nil, err } rules := []logical.OptimizeRule{ logical.NewPushDownOrder(criteria.OrderBy), logical.NewPushDownMaxSize(pushedLimit), } if err := logical.ApplyRules(p, rules...); err != nil { return nil, err } return p, nil } // parseFields parses the query request to decide which kind of plan should be generated // Basically, // 1 - If no criteria is given, we can only scan all shards // 2 - If criteria is given, but all of those fields exist in the "entity" definition. func parseFields(criteria *measurev1.QueryRequest, metadata *commonv1.Metadata, groupByEntity bool) logical.UnresolvedPlan { projFields := make([]*logical.Field, len(criteria.GetFieldProjection().GetNames())) for i, fieldNameProj := range criteria.GetFieldProjection().GetNames() { projFields[i] = logical.NewField(fieldNameProj) } timeRange := criteria.GetTimeRange() return indexScan(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata, logical.ToTags(criteria.GetTagProjection()), projFields, groupByEntity, criteria.GetCriteria()) }