pkg/query/logical/stream/stream_analyzer.go (140 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"context"
"fmt"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
const defaultLimit uint32 = 20
// BuildSchema returns Schema loaded from the metadata repository.
func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
sm := streamSchema.GetSchema()
s := &schema{
common: &logical.CommonSchema{
IndexRules: streamSchema.GetIndexRules(),
TagSpecMap: make(map[string]*logical.TagSpec),
EntityList: sm.GetEntity().GetTagNames(),
},
stream: sm,
}
s.common.RegisterTagFamilies(sm.GetTagFamilies())
return s, nil
}
// Analyze converts logical expressions to executable operation tree represented by Plan.
func Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
// parse fields
plan := parseTags(criteria, metadata)
// parse offset
plan = newOffset(plan, criteria.GetOffset())
// parse limit
limitParameter := criteria.GetLimit()
if limitParameter == 0 {
limitParameter = defaultLimit
}
plan = newLimit(plan, limitParameter)
p, err := plan.Analyze(s)
if err != nil {
return nil, err
}
rules := []logical.OptimizeRule{
logical.NewPushDownOrder(criteria.OrderBy),
logical.NewPushDownMaxSize(int(limitParameter + criteria.GetOffset())),
}
if err := logical.ApplyRules(p, rules...); err != nil {
return nil, err
}
return p, nil
}
var (
_ logical.Plan = (*limit)(nil)
_ logical.UnresolvedPlan = (*limit)(nil)
)
// Parent refers to a parent node in the execution tree(plan).
type Parent struct {
UnresolvedInput logical.UnresolvedPlan
Input logical.Plan
}
type limit struct {
*Parent
LimitNum uint32
}
func (l *limit) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) {
entities, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec)
if err != nil {
return nil, err
}
if len(entities) > int(l.LimitNum) {
return entities[:l.LimitNum], nil
}
return entities, nil
}
func (l *limit) Analyze(s logical.Schema) (logical.Plan, error) {
var err error
l.Input, err = l.UnresolvedInput.Analyze(s)
if err != nil {
return nil, err
}
return l, nil
}
func (l *limit) Schema() logical.Schema {
return l.Input.Schema()
}
func (l *limit) String() string {
return fmt.Sprintf("%s Limit: %d", l.Input.String(), l.LimitNum)
}
func (l *limit) Children() []logical.Plan {
return []logical.Plan{l.Input}
}
func newLimit(input logical.UnresolvedPlan, num uint32) logical.UnresolvedPlan {
return &limit{
Parent: &Parent{
UnresolvedInput: input,
},
LimitNum: num,
}
}
var (
_ logical.Plan = (*offset)(nil)
_ logical.UnresolvedPlan = (*offset)(nil)
)
type offset struct {
*Parent
offsetNum uint32
}
func (l *offset) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) {
elements, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec)
if err != nil {
return nil, err
}
if len(elements) > int(l.offsetNum) {
return elements[l.offsetNum:], nil
}
return []*streamv1.Element{}, nil
}
func (l *offset) Analyze(s logical.Schema) (logical.Plan, error) {
var err error
l.Input, err = l.UnresolvedInput.Analyze(s)
if err != nil {
return nil, err
}
return l, nil
}
func (l *offset) Schema() logical.Schema {
return l.Input.Schema()
}
func (l *offset) String() string {
return fmt.Sprintf("%s Offset: %d", l.Input.String(), l.offsetNum)
}
func (l *offset) Children() []logical.Plan {
return []logical.Plan{l.Input}
}
func newOffset(input logical.UnresolvedPlan, num uint32) logical.UnresolvedPlan {
return &offset{
Parent: &Parent{
UnresolvedInput: input,
},
offsetNum: num,
}
}
// parseTags parses the query request to decide which kind of plan should be generated
// Basically,
// 1 - If no criteria is given, we can only scan all shards
// 2 - If criteria is given, but all of those fields exist in the "entity" definition,
//
// i.e. they are top-level sharding keys. For example, for the current skywalking's streamSchema,
// we use service_id + service_instance_id + state as the compound sharding keys.
func parseTags(criteria *streamv1.QueryRequest, metadata *commonv1.Metadata) logical.UnresolvedPlan {
timeRange := criteria.GetTimeRange()
return tagFilter(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata,
criteria.Criteria, logical.ToTags(criteria.GetProjection()))
}