banyand/query/processor_topn.go (432 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package query
import (
"bytes"
"container/heap"
"context"
"time"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type topNQueryProcessor struct {
measureService measure.Service
*queryService
}
func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
return
}
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
t.log.Warn().Msg("invalid requested sort direction")
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
}
topNMetadata := request.GetMetadata()
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to get execution context")
return
}
if topNSchema.GetFieldValueSort() != modelv1.Sort_SORT_UNSPECIFIED &&
topNSchema.GetFieldValueSort() != request.GetFieldValueSort() {
t.log.Warn().Msg("unmatched sort direction")
return
}
sourceMeasure, err := t.measureService.Measure(topNSchema.GetSourceMeasure())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to find source measure")
return
}
shards, err := sourceMeasure.CompanionShards(topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to list shards")
return
}
aggregator := createTopNPostAggregator(request.GetTopN(),
request.GetAgg(), request.GetFieldValueSort())
entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to parse entity")
return
}
for _, shard := range shards {
// TODO: support condition
sl, innerErr := shard.Series().List(context.WithValue(
context.Background(),
logger.ContextKey,
t.log,
), tsdb.NewPath(entity))
if innerErr != nil {
t.log.Error().Err(innerErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to list series")
return
}
for _, series := range sl {
iters, scanErr := t.scanSeries(series, request)
if scanErr != nil {
t.log.Error().Err(innerErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to scan series")
return
}
if len(iters) < 1 {
continue
}
for _, iter := range iters {
for iter.Next() {
tuple, parseErr := parseTopNFamily(iter.Val(), sourceMeasure.GetInterval())
if parseErr != nil {
t.log.Error().Err(parseErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to parse topN family")
return
}
_ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time())
}
_ = iter.Close()
}
}
}
now := time.Now().UnixNano()
resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
return
}
func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort,
conditions []*modelv1.Condition,
) (tsdb.Entity, error) {
entityMap := make(map[string]int)
entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
// sortDirection
entity[0] = convert.Int64ToBytes(int64(sortDirection.Number()))
// rankNumber
entity[1] = tsdb.AnyEntry
for idx, tagName := range topNSchema.GetGroupByTagNames() {
entityMap[tagName] = idx + 2
// allow to make fuzzy search with partial conditions
entity[idx+2] = tsdb.AnyEntry
}
for _, pairQuery := range conditions {
if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
return nil, errors.New("op other than EQ is not supported")
}
if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
switch v := pairQuery.GetValue().GetValue().(type) {
case *modelv1.TagValue_Str:
entity[entityIdx] = []byte(v.Str.GetValue())
case *modelv1.TagValue_Int:
entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue())
default:
return nil, errors.New("unsupported condition tag type for entity")
}
continue
}
return nil, errors.New("only groupBy tag name is supported")
}
return entity, nil
}
func parseTopNFamily(item tsdb.Item, interval time.Duration) (*streaming.Tuple2, error) {
familyRawBytes, err := item.Family(familyIdentity(measure.TopNTagFamily, pbv1.TagFlag))
if err != nil {
return nil, err
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
if err != nil {
return nil, err
}
fieldBytes, err := item.Family(familyIdentity(measure.TopNValueFieldSpec.GetName(),
pbv1.EncoderFieldFlag(measure.TopNValueFieldSpec, interval)))
if err != nil {
return nil, err
}
fieldValue, err := pbv1.DecodeFieldValue(fieldBytes, measure.TopNValueFieldSpec)
if err != nil {
return nil, err
}
return &streaming.Tuple2{
// GroupValues
V1: tagFamily.GetTags()[1:],
// FieldValue
V2: fieldValue.GetInt().GetValue(),
}, nil
}
func familyIdentity(name string, flag []byte) []byte {
return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil)
}
func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.TopNRequest) ([]tsdb.Iterator, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange(
request.GetTimeRange().GetBegin().AsTime(),
request.GetTimeRange().GetEnd().AsTime()),
)
if err != nil {
if errors.Is(err, tsdb.ErrEmptySeriesSpan) {
return nil, nil
}
return nil, err
}
defer func(seriesSpan tsdb.SeriesSpan) {
if seriesSpan != nil {
_ = seriesSpan.Close()
}
}(seriesSpan)
seeker, err := seriesSpan.SeekerBuilder().OrderByTime(modelv1.Sort_SORT_ASC).Build()
if err != nil {
return nil, err
}
return seeker.Seek()
}
var _ heap.Interface = (*postAggregationProcessor)(nil)
type aggregatorItem struct {
int64Func aggregation.Func[int64]
key string
values tsdb.EntityValues
index int
}
func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
tags := make([]*modelv1.Tag, len(n.values))
for i := 0; i < len(tags); i++ {
tags[i] = &modelv1.Tag{
Key: tagNames[i],
Value: n.values[i],
}
}
return tags
}
// postProcessor defines necessary methods for Top-N post processor with or without aggregation.
type postProcessor interface {
put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error
val([]string) []*measurev1.TopNList
}
func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction, sort modelv1.Sort) postProcessor {
if aggrFunc == modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
// if aggregation is not specified, we have to keep all timelines
return &postNonAggregationProcessor{
topN: topN,
sort: sort,
timelines: make(map[uint64]*flow.DedupPriorityQueue),
}
}
aggregator := &postAggregationProcessor{
topN: topN,
sort: sort,
aggrFunc: aggrFunc,
cache: make(map[string]*aggregatorItem),
items: make([]*aggregatorItem, 0, topN),
}
heap.Init(aggregator)
return aggregator
}
// postAggregationProcessor is an implementation of postProcessor with aggregation.
type postAggregationProcessor struct {
cache map[string]*aggregatorItem
items []*aggregatorItem
latestTimestamp uint64
topN int32
sort modelv1.Sort
aggrFunc modelv1.AggregationFunction
}
func (aggr postAggregationProcessor) Len() int {
return len(aggr.items)
}
// Less reports whether min/max heap has to be built.
// For DESC, a min heap has to be built,
// while for ASC, a max heap has to be built.
func (aggr postAggregationProcessor) Less(i, j int) bool {
if aggr.sort == modelv1.Sort_SORT_DESC {
return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
}
return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
}
func (aggr *postAggregationProcessor) Swap(i, j int) {
aggr.items[i], aggr.items[j] = aggr.items[j], aggr.items[i]
aggr.items[i].index = i
aggr.items[j].index = j
}
func (aggr *postAggregationProcessor) Push(x any) {
n := len(aggr.items)
item := x.(*aggregatorItem)
item.index = n
aggr.items = append(aggr.items, item)
}
func (aggr *postAggregationProcessor) Pop() any {
old := aggr.items
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
aggr.items = old[0 : n-1]
return item
}
func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
// update latest ts
if aggr.latestTimestamp < timestampMillis {
aggr.latestTimestamp = timestampMillis
}
key := entityValues.String()
if item, found := aggr.cache[key]; found {
item.int64Func.In(val)
aggr.tryEnqueue(key, item)
return nil
}
aggrFunc, err := aggregation.NewFunc[int64](aggr.aggrFunc)
if err != nil {
return err
}
item := &aggregatorItem{
key: key,
int64Func: aggrFunc,
values: entityValues,
}
item.int64Func.In(val)
if aggr.Len() < int(aggr.topN) {
aggr.cache[key] = item
heap.Push(aggr, item)
} else {
aggr.tryEnqueue(key, item)
}
return nil
}
func (aggr *postAggregationProcessor) tryEnqueue(key string, item *aggregatorItem) {
if lowest := aggr.items[0]; lowest != nil {
if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < item.int64Func.Val() {
aggr.cache[key] = item
aggr.items[0] = item
heap.Fix(aggr, 0)
} else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > item.int64Func.Val() {
aggr.cache[key] = item
aggr.items[0] = item
heap.Fix(aggr, 0)
}
}
}
func (aggr *postAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
for aggr.Len() > 0 {
item := heap.Pop(aggr).(*aggregatorItem)
topNItems[aggr.Len()] = &measurev1.TopNList_Item{
Entity: item.GetTags(tagNames),
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{Value: item.int64Func.Val()},
},
},
}
}
return []*measurev1.TopNList{
{
Timestamp: timestamppb.New(time.Unix(0, int64(aggr.latestTimestamp))),
Items: topNItems,
},
}
}
var _ flow.Element = (*nonAggregatorItem)(nil)
type nonAggregatorItem struct {
key string
values tsdb.EntityValues
val int64
index int
}
func (n *nonAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
tags := make([]*modelv1.Tag, len(n.values))
for i := 0; i < len(tags); i++ {
tags[i] = &modelv1.Tag{
Key: tagNames[i],
Value: n.values[i],
}
}
return tags
}
func (n *nonAggregatorItem) GetIndex() int {
return n.index
}
func (n *nonAggregatorItem) SetIndex(i int) {
n.index = i
}
type postNonAggregationProcessor struct {
timelines map[uint64]*flow.DedupPriorityQueue
topN int32
sort modelv1.Sort
}
func (naggr *postNonAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
for ts, timeline := range naggr.timelines {
items := make([]*measurev1.TopNList_Item, timeline.Len())
for idx, elem := range timeline.Values() {
items[idx] = &measurev1.TopNList_Item{
Entity: elem.(*nonAggregatorItem).GetTags(tagNames),
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{Value: elem.(*nonAggregatorItem).val},
},
},
}
}
topNLists = append(topNLists, &measurev1.TopNList{
Timestamp: timestamppb.New(time.Unix(0, int64(ts))),
Items: items,
})
}
slices.SortStableFunc(topNLists, func(a, b *measurev1.TopNList) bool {
if a.GetTimestamp().GetSeconds() < b.GetTimestamp().GetSeconds() {
return true
} else if a.GetTimestamp().GetSeconds() == b.GetTimestamp().GetSeconds() {
return a.GetTimestamp().GetNanos() < b.GetTimestamp().GetNanos()
}
return false
})
return topNLists
}
func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
key := entityValues.String()
if timeline, ok := naggr.timelines[timestampMillis]; ok {
if timeline.Len() < int(naggr.topN) {
heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
} else {
if lowest := timeline.Peek(); lowest != nil {
if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
}
}
}
return nil
}
timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
if naggr.sort == modelv1.Sort_SORT_DESC {
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
return -1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
return 1
}
}
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
return 1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
return -1
}
}, false)
naggr.timelines[timestampMillis] = timeline
heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
return nil
}