pkg/query/logical/measure/measure_plan_groupby.go (239 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"fmt"
"math"
"github.com/cespare/xxhash"
"github.com/pkg/errors"
"go.uber.org/multierr"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
var (
_ logical.UnresolvedPlan = (*unresolvedGroup)(nil)
_ logical.Plan = (*groupBy)(nil)
)
type unresolvedGroup struct {
unresolvedInput logical.UnresolvedPlan
// groupBy should be a subset of tag projection
groupBy [][]*logical.Tag
groupByEntity bool
}
func newUnresolvedGroupBy(input logical.UnresolvedPlan, groupBy [][]*logical.Tag, groupByEntity bool) logical.UnresolvedPlan {
return &unresolvedGroup{
unresolvedInput: input,
groupBy: groupBy,
groupByEntity: groupByEntity,
}
}
func (gba *unresolvedGroup) Analyze(measureSchema logical.Schema) (logical.Plan, error) {
prevPlan, err := gba.unresolvedInput.Analyze(measureSchema)
if err != nil {
return nil, err
}
// check validity of groupBy tags
schema := prevPlan.Schema()
groupByTagRefs, err := schema.CreateTagRef(gba.groupBy...)
if err != nil {
return nil, err
}
return &groupBy{
Parent: &logical.Parent{
UnresolvedInput: gba.unresolvedInput,
Input: prevPlan,
},
schema: schema,
groupByTagsRefs: groupByTagRefs,
groupByEntity: gba.groupByEntity,
}, nil
}
type groupBy struct {
*logical.Parent
schema logical.Schema
groupByTagsRefs [][]*logical.TagRef
groupByEntity bool
}
func (g *groupBy) String() string {
var method string
if g.groupByEntity {
method = "sort"
} else {
method = "hash"
}
return fmt.Sprintf("%s GroupBy: groupBy=%s, method=%s",
g.Input,
logical.FormatTagRefs(", ", g.groupByTagsRefs...), method)
}
func (g *groupBy) Children() []logical.Plan {
return []logical.Plan{g.Input}
}
func (g *groupBy) Schema() logical.Schema {
return g.schema.ProjTags(g.groupByTagsRefs...)
}
func (g *groupBy) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
if g.groupByEntity {
return g.sort(ec)
}
return g.hash(ec)
}
func (g *groupBy) sort(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
if err != nil {
return nil, err
}
return newGroupSortIterator(iter, g.groupByTagsRefs), nil
}
func (g *groupBy) hash(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) {
iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
if err != nil {
return nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()
groupMap := make(map[uint64][]*measurev1.DataPoint)
groupLst := make([]uint64, 0)
for iter.Next() {
dataPoints := iter.Current()
for _, dp := range dataPoints {
key, innerErr := formatGroupByKey(dp, g.groupByTagsRefs)
if innerErr != nil {
return nil, innerErr
}
group, ok := groupMap[key]
if !ok {
group = make([]*measurev1.DataPoint, 0)
groupLst = append(groupLst, key)
}
if group == nil {
return nil, errors.New("aggregation op does not exist")
}
group = append(group, dp)
groupMap[key] = group
}
}
return newGroupIterator(groupMap, groupLst), nil
}
func formatGroupByKey(point *measurev1.DataPoint, groupByTagsRefs [][]*logical.TagRef) (uint64, error) {
hash := xxhash.New()
for _, tagFamilyRef := range groupByTagsRefs {
for _, tagRef := range tagFamilyRef {
tag := point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx]
switch v := tag.GetValue().GetValue().(type) {
case *modelv1.TagValue_Str:
_, innerErr := hash.Write([]byte(v.Str.GetValue()))
if innerErr != nil {
return 0, innerErr
}
case *modelv1.TagValue_Int:
_, innerErr := hash.Write(convert.Int64ToBytes(v.Int.GetValue()))
if innerErr != nil {
return 0, innerErr
}
case *modelv1.TagValue_IntArray, *modelv1.TagValue_StrArray, *modelv1.TagValue_BinaryData:
return 0, errors.New("group-by on array/binary tag is not supported")
}
}
}
return hash.Sum64(), nil
}
type groupIterator struct {
groupMap map[uint64][]*measurev1.DataPoint
groupLst []uint64
index int
}
func newGroupIterator(groupedMap map[uint64][]*measurev1.DataPoint, groupLst []uint64) executor.MIterator {
return &groupIterator{
groupMap: groupedMap,
groupLst: groupLst,
index: -1,
}
}
func (gmi *groupIterator) Next() bool {
if gmi.index >= (len(gmi.groupLst) - 1) {
return false
}
gmi.index++
return true
}
func (gmi *groupIterator) Current() []*measurev1.DataPoint {
key := gmi.groupLst[gmi.index]
return gmi.groupMap[key]
}
func (gmi *groupIterator) Close() error {
gmi.index = math.MaxInt
return nil
}
type groupSortIterator struct {
iter executor.MIterator
err error
cdp *measurev1.DataPoint
groupByTagsRefs [][]*logical.TagRef
current []*measurev1.DataPoint
index int
key uint64
closed bool
}
func newGroupSortIterator(iter executor.MIterator, groupByTagsRefs [][]*logical.TagRef) executor.MIterator {
return &groupSortIterator{
groupByTagsRefs: groupByTagsRefs,
iter: iter,
index: -1,
}
}
func (gmi *groupSortIterator) Next() bool {
if gmi.closed {
return false
}
if gmi.current != nil {
gmi.current = gmi.current[:0]
}
if gmi.cdp != nil {
gmi.current = append(gmi.current, gmi.cdp)
}
for {
dp, ok := gmi.nextDP()
if !ok {
gmi.closed = true
return len(gmi.current) > 0
}
k, err := formatGroupByKey(dp, gmi.groupByTagsRefs)
if err != nil {
gmi.closed = true
gmi.err = err
return false
}
if gmi.key == 0 {
gmi.key = k
}
if gmi.key != k {
gmi.cdp = dp
gmi.key = k
return true
}
gmi.current = append(gmi.current, dp)
}
}
func (gmi *groupSortIterator) Current() []*measurev1.DataPoint {
return gmi.current
}
func (gmi *groupSortIterator) Close() error {
gmi.closed = true
return multierr.Combine(gmi.err, gmi.iter.Close())
}
func (gmi *groupSortIterator) nextDP() (*measurev1.DataPoint, bool) {
if gmi.index < 0 {
if ok := gmi.iter.Next(); !ok {
return nil, false
}
gmi.index = 0
} else {
gmi.index++
}
current := gmi.iter.Current()
if len(current) < 1 || gmi.index >= len(current) {
gmi.index = -1
return gmi.nextDP()
}
return current[gmi.index], true
}