pkg/query/logical/measure/measure_plan_top.go (105 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.uber.org/multierr"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
var (
_ logical.UnresolvedPlan = (*unresolvedGroup)(nil)
_ logical.Plan = (*groupBy)(nil)
)
type unresolvedTop struct {
unresolvedInput logical.UnresolvedPlan
top *measurev1.QueryRequest_Top
}
func top(input logical.UnresolvedPlan, top *measurev1.QueryRequest_Top) logical.UnresolvedPlan {
return &unresolvedTop{
unresolvedInput: input,
top: top,
}
}
func (gba *unresolvedTop) Analyze(measureSchema logical.Schema) (logical.Plan, error) {
prevPlan, err := gba.unresolvedInput.Analyze(measureSchema)
if err != nil {
return nil, err
}
fieldRefs, err := prevPlan.Schema().CreateFieldRef(logical.NewField(gba.top.FieldName))
if err != nil {
return nil, err
}
if len(fieldRefs) == 0 {
return nil, errors.Wrap(errFieldNotDefined, "top schema")
}
reverted := false
if gba.top.FieldValueSort == modelv1.Sort_SORT_ASC {
reverted = true
}
return &topOp{
Parent: &logical.Parent{
UnresolvedInput: gba.unresolvedInput,
Input: prevPlan,
},
topNStream: NewTopQueue(int(gba.top.Number), reverted),
fieldRef: fieldRefs[0],
}, nil
}
type topOp struct {
*logical.Parent
topNStream *TopQueue
fieldRef *logical.FieldRef
}
func (g *topOp) String() string {
return fmt.Sprintf("%s top %s", g.Input, g.topNStream.String())
}
func (g *topOp) Children() []logical.Plan {
return []logical.Plan{g.Input}
}
func (g *topOp) Schema() logical.Schema {
return g.Input.Schema()
}
func (g *topOp) Execute(ec context.Context) (mit executor.MIterator, err error) {
iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
if err != nil {
return nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()
g.topNStream.Purge()
for iter.Next() {
dpp := iter.Current()
for _, dp := range dpp {
value := dp.GetFields()[g.fieldRef.Spec.FieldIdx].
GetValue().
GetInt().
GetValue()
g.topNStream.Insert(NewTopElement(dp, value))
}
}
return newTopIterator(g.topNStream.Elements()), nil
}
type topIterator struct {
elements []TopElement
index int
}
func newTopIterator(elements []TopElement) executor.MIterator {
return &topIterator{
elements: elements,
index: -1,
}
}
func (ami *topIterator) Next() bool {
ami.index++
return ami.index < len(ami.elements)
}
func (ami *topIterator) Current() []*measurev1.DataPoint {
return []*measurev1.DataPoint{ami.elements[ami.index].dp}
}
func (ami *topIterator) Close() error {
return nil
}