pkg/flow/streaming/topn.go (229 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package streaming
import (
"strconv"
"strings"
"time"
"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
// TopNSort defines the order of sorting.
type TopNSort uint8
// The available order of sorting.
const (
DESC TopNSort = iota
ASC
)
type windowedFlow struct {
f *streamingFlow
wa flow.WindowAssigner
l *logger.Logger
}
func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {
s.wa.(*tumblingTimeWindows).aggregationFactory = func() flow.AggregationOp {
topNAggrFunc := &topNAggregatorGroup{
cacheSize: topNum,
sort: DESC,
l: s.l,
}
// apply user customized options
for _, opt := range opts {
if applier, ok := opt.(TopNOption); ok {
applier(topNAggrFunc)
}
}
if topNAggrFunc.sortKeyExtractor == nil {
s.f.drainErr(errors.New("sortKeyExtractor must be specified"))
}
if topNAggrFunc.sort == ASC {
topNAggrFunc.comparator = utils.Int64Comparator
} else { // DESC
topNAggrFunc.comparator = func(a, b interface{}) int {
return utils.Int64Comparator(b, a)
}
}
topNAggrFunc.aggregatorGroup = make(map[string]*topNAggregator)
return topNAggrFunc
}
return s.f
}
type topNAggregatorGroup struct {
aggregatorGroup map[string]*topNAggregator
keyExtractor func(flow.StreamRecord) uint64
sortKeyExtractor func(flow.StreamRecord) int64
groupKeyExtractor func(flow.StreamRecord) string
comparator utils.Comparator
l *logger.Logger
cacheSize int
sort TopNSort
}
type topNAggregator struct {
*topNAggregatorGroup
treeMap *treemap.Map
dict map[uint64]int64
dirty bool
}
// TopNOption is the option to set up a top-n aggregator group.
type TopNOption func(aggregator *topNAggregatorGroup)
// WithSortKeyExtractor sets a closure to extract the sorting key.
func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.sortKeyExtractor = sortKeyExtractor
}
}
// WithKeyExtractor sets a closure to extract the key.
func WithKeyExtractor(keyExtractor func(flow.StreamRecord) uint64) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.keyExtractor = keyExtractor
}
}
// WithGroupKeyExtractor extract group key from the StreamRecord.
func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.groupKeyExtractor = groupKeyExtractor
}
}
// OrderBy sets the sorting order.
func OrderBy(sort TopNSort) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.sort = sort
}
}
func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) {
for _, item := range input {
key := t.keyExtractor(item)
sortKey := t.sortKeyExtractor(item)
groupKey := t.groupKeyExtractor(item)
aggregator := t.getOrCreateGroup(groupKey)
aggregator.removeExistedItem(key)
if aggregator.checkSortKeyInBufferRange(sortKey) {
if e := t.l.Debug(); e.Enabled() {
e.Str("group", groupKey).Uint64("key", key).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer")
}
aggregator.put(key, sortKey, item)
aggregator.doCleanUp()
}
}
}
func (t *topNAggregatorGroup) Snapshot() interface{} {
groupRanks := make(map[string][]*Tuple2)
for group, aggregator := range t.aggregatorGroup {
if !aggregator.dirty {
continue
}
aggregator.dirty = false
iter := aggregator.treeMap.Iterator()
items := make([]*Tuple2, 0, aggregator.size())
for iter.Next() {
list := iter.Value().([]interface{})
for _, item := range list {
items = append(items, &Tuple2{iter.Key(), item})
}
}
groupRanks[group] = items
}
if len(groupRanks) > 0 {
if e := t.l.Debug(); e.Enabled() {
sb := strings.Builder{}
for g, item := range groupRanks {
sb.WriteString("{")
sb.WriteString(g)
sb.WriteString(":")
sb.WriteString(strconv.Itoa(len(item)))
sb.WriteString("}")
}
t.l.Debug().Interface("snapshot", sb.String()).Msg("taken a topN snapshot")
}
}
return groupRanks
}
func (t *topNAggregatorGroup) Dirty() bool {
for _, aggregator := range t.aggregatorGroup {
if aggregator.dirty {
return true
}
}
return false
}
func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator {
aggregator, groupExist := t.aggregatorGroup[group]
if groupExist {
return aggregator
}
t.aggregatorGroup[group] = &topNAggregator{
topNAggregatorGroup: t,
treeMap: treemap.NewWith(t.comparator),
dict: make(map[uint64]int64),
}
return t.aggregatorGroup[group]
}
func (t *topNAggregator) doCleanUp() {
// do cleanup: maintain the treeMap windowSize
if t.size() > t.cacheSize {
lastKey, lastValues := t.treeMap.Max()
l := lastValues.([]interface{})
delete(t.dict, t.keyExtractor(l[len(l)-1].(flow.StreamRecord)))
// remove last one
if len(l) <= 1 {
t.treeMap.Remove(lastKey)
} else {
t.treeMap.Put(lastKey, l[:len(l)-1])
}
}
}
func (t *topNAggregator) put(key uint64, sortKey int64, data flow.StreamRecord) {
t.dirty = true
if existingList, ok := t.treeMap.Get(sortKey); ok {
existingList = append(existingList.([]interface{}), data)
t.treeMap.Put(sortKey, existingList)
} else {
t.treeMap.Put(sortKey, []interface{}{data})
}
t.dict[key] = sortKey
}
func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
// get the "maximum" item
// - if ASC, the maximum item
// - else DESC, the minimum item
worstKey, _ := t.treeMap.Max()
if worstKey == nil {
// return true if the buffer is empty.
return true
}
if t.comparator(sortKey, worstKey.(int64)) < 0 {
return true
}
return t.size() < t.cacheSize
}
func (t *topNAggregator) removeExistedItem(key uint64) {
existed, ok := t.dict[key]
if !ok {
return
}
delete(t.dict, key)
list, ok := t.treeMap.Get(existed)
if !ok {
return
}
l := list.([]interface{})
for i := 0; i < len(l); i++ {
if t.keyExtractor(l[i].(flow.StreamRecord)) == key {
l = append(l[:i], l[i+1:]...)
}
}
if len(l) == 0 {
t.treeMap.Remove(existed)
return
}
t.treeMap.Put(existed, l)
}
func (t *topNAggregator) size() int {
return len(t.dict)
}
func (t *topNAggregatorGroup) leakCheck() {
for g, agg := range t.aggregatorGroup {
if agg.size() > t.cacheSize {
panic(g + "leak detected: topN buffer size exceed the cache size")
}
iter := agg.treeMap.Iterator()
count := 0
for iter.Next() {
count += len(iter.Value().([]interface{}))
}
if count != agg.size() {
panic(g + "leak detected: treeMap size not match dictionary size")
}
}
}
// Tuple2 is a tuple with 2 fields. Each field may be a separate type.
type Tuple2 struct {
V1 interface{} `json:"v1"`
V2 interface{} `json:"v2"`
}