banyand/measure/measure_topn.go (418 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"context"
"encoding/base64"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
apiData "github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
const (
timeBucketFormat = "200601021504"
// TopNTagFamily is the identity of a tag family which contains the topN calculated result.
TopNTagFamily = "__topN__"
)
var (
_ io.Closer = (*topNStreamingProcessor)(nil)
_ io.Closer = (*topNProcessorManager)(nil)
_ flow.Sink = (*topNStreamingProcessor)(nil)
// TopNValueFieldSpec denotes the field specification of the topN calculated result.
TopNValueFieldSpec = &databasev1.FieldSpec{
Name: "value",
FieldType: databasev1.FieldType_FIELD_TYPE_INT,
EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
}
)
type dataPointWithEntityValues struct {
*measurev1.DataPointValue
entityValues tsdb.EntityValues
}
type topNStreamingProcessor struct {
m *measure
streamingFlow flow.Flow
l *logger.Logger
pipeline queue.Queue
topNSchema *databasev1.TopNAggregation
src chan interface{}
in chan flow.StreamRecord
errCh <-chan error
stopCh chan struct{}
flow.ComponentState
interval time.Duration
sortDirection modelv1.Sort
}
func (t *topNStreamingProcessor) In() chan<- flow.StreamRecord {
return t.in
}
func (t *topNStreamingProcessor) Setup(ctx context.Context) error {
t.Add(1)
go t.run(ctx)
return nil
}
func (t *topNStreamingProcessor) run(ctx context.Context) {
defer t.Done()
for {
select {
case record, ok := <-t.in:
if !ok {
return
}
// nolint: contextcheck
if err := t.writeStreamRecord(record); err != nil {
t.l.Err(err).Msg("fail to write stream record")
}
case <-ctx.Done():
return
}
}
}
// Teardown is called by the Flow as a lifecycle hook.
// So we should not block on err channel within this method.
func (t *topNStreamingProcessor) Teardown(_ context.Context) error {
t.Wait()
return nil
}
func (t *topNStreamingProcessor) Close() error {
close(t.src)
// close streaming flow
err := t.streamingFlow.Close()
// and wait for error channel close
<-t.stopCh
t.stopCh = nil
return err
}
func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
if !ok {
return errors.New("invalid data type")
}
// down-sample the start of the timeWindow to a time-bucket
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
timeBucket := eventTime.Format(timeBucketFormat)
var err error
for group, tuples := range tuplesGroups {
if e := t.l.Debug(); e.Enabled() {
e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
Str("group", group).
Int("rankNums", len(tuples)).
Msg("Write tuples")
}
for rankNum, tuple := range tuples {
fieldValue := tuple.V1.(int64)
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, group, data, rankNum))
}
}
return err
}
func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64,
group string, data flow.Data, rankNum int,
) error {
var tagValues []*modelv1.TagValue
if len(t.topNSchema.GetGroupByTagNames()) > 0 {
var ok bool
if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
return errors.New("fail to extract tag values from topN result")
}
}
entity, entityValues, shardID, err := t.locate(tagValues, rankNum)
if err != nil {
return err
}
// measureID is consist of three parts,
// 1. groupValues
// 2. rankNumber
// 3. timeBucket
measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: t.topNSchema.GetMetadata(),
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: append([]*modelv1.TagValue{
// MeasureID
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: measureID,
},
},
},
}, data[0].(tsdb.EntityValues)...),
},
},
Fields: []*modelv1.FieldValue{
{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{
Value: fieldValue,
},
},
},
},
},
},
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
}
if t.l.Debug().Enabled() {
iwr.EntityValues = entityValues.Encode()
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
_, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
return errWritePub
}
func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time {
return time.UnixMilli(eventTimeMillis - eventTimeMillis%t.interval.Milliseconds())
}
func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) {
if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
return nil, nil, 0, errors.New("no enough tag values for the entity")
}
// entity prefix
// 1) source measure Name + topN aggregation Name
// 2) sort direction
// 3) rank number
// >4) group tag values if needed
entity := make(tsdb.EntityValues, 1+1+1+len(tagValues))
// entity prefix
entity[0] = tsdb.StrValue(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
t.topNSchema.GetMetadata().GetName()))
entity[1] = tsdb.Int64Value(int64(t.sortDirection.Number()))
entity[2] = tsdb.Int64Value(int64(rankNum))
// measureID as sharding key
for idx, tagVal := range tagValues {
entity[idx+3] = tagVal
}
e, err := entity.ToEntity()
if err != nil {
return nil, nil, 0, err
}
id, err := partition.ShardID(e.Marshal(), t.m.shardNum)
if err != nil {
return nil, nil, 0, err
}
return e, entity, common.ShardID(id), nil
}
func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval)).
AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
TopN(int(t.topNSchema.GetCountersNumber()),
streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[2].(int64)
}),
orderBy(t.topNSchema.GetFieldValueSort()),
streaming.WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[1].(string)
}),
).To(t).Open()
go t.handleError()
return t
}
func orderBy(sort modelv1.Sort) streaming.TopNOption {
if sort == modelv1.Sort_SORT_ASC {
return streaming.OrderBy(streaming.ASC)
}
return streaming.OrderBy(streaming.DESC)
}
func (t *topNStreamingProcessor) handleError() {
for err := range t.errCh {
t.l.Err(err).Str("topN", t.topNSchema.GetMetadata().GetName()).
Msg("error occurred during flow setup or process")
}
t.stopCh <- struct{}{}
}
// topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure.
type topNProcessorManager struct {
l *logger.Logger
pipeline queue.Queue
m *measure
s logical.TagSpecRegistry
processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
topNSchemas []*databasev1.TopNAggregation
sync.RWMutex
}
func (manager *topNProcessorManager) Close() error {
manager.Lock()
defer manager.Unlock()
var err error
for _, processorList := range manager.processorMap {
for _, processor := range processorList {
err = multierr.Append(err, processor.Close())
}
}
return err
}
func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) {
go func() {
manager.RLock()
defer manager.RUnlock()
for _, processorList := range manager.processorMap {
for _, processor := range processorList {
processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
request.GetRequest().GetDataPoint(),
request.GetEntityValues(),
}, request.GetRequest().GetDataPoint().GetTimestamp())
}
}
}()
}
func (manager *topNProcessorManager) start() error {
interval := manager.m.interval
for _, topNSchema := range manager.topNSchemas {
sortDirections := make([]modelv1.Sort, 0, 2)
if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
} else {
sortDirections = append(sortDirections, topNSchema.GetFieldValueSort())
}
processorList := make([]*topNStreamingProcessor, len(sortDirections))
for i, sortDirection := range sortDirections {
srcCh := make(chan interface{})
src, _ := sources.NewChannel(srcCh)
streamingFlow := streaming.New(src)
filters, buildErr := manager.buildFilter(topNSchema.GetCriteria())
if buildErr != nil {
return buildErr
}
streamingFlow = streamingFlow.Filter(filters)
mapper, innerErr := manager.buildMapper(topNSchema.GetFieldName(), topNSchema.GetGroupByTagNames()...)
if innerErr != nil {
return innerErr
}
streamingFlow = streamingFlow.Map(mapper)
processor := &topNStreamingProcessor{
m: manager.m,
l: manager.l,
interval: interval,
topNSchema: topNSchema,
sortDirection: sortDirection,
src: srcCh,
in: make(chan flow.StreamRecord),
stopCh: make(chan struct{}),
streamingFlow: streamingFlow,
pipeline: manager.pipeline,
}
processorList[i] = processor.start()
}
manager.processorMap[topNSchema.GetSourceMeasure()] = processorList
}
return nil
}
func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (flow.UnaryFunc[bool], error) {
// if criteria is nil, we handle all incoming elements
if criteria == nil {
return func(_ context.Context, dataPoint any) bool {
return true
}, nil
}
f, err := logical.BuildSimpleTagFilter(criteria)
if err != nil {
return nil, err
}
return func(_ context.Context, request any) bool {
tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
if matchErr != nil {
manager.l.Err(matchErr).Msg("fail to match criteria")
return false
}
return ok
}, nil
}
func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
return spec.GetName() == fieldName
})
if fieldIdx == -1 {
return nil, errors.New("invalid fieldName")
}
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
if len(dpWithEvs.GetFields()) <= fieldIdx {
manager.l.Warn().Interface("point", dpWithEvs.DataPointValue).
Str("fieldName", fieldName).
Int("len", len(dpWithEvs.GetFields())).
Int("fieldIdx", fieldIdx).
Msg("out of range")
}
return flow.Data{
// EntityValues as identity
dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
"",
// field value as v2
// TODO: we only support int64
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
nil,
}
}, nil
}
groupLocator, err := newGroupLocator(manager.m.GetSchema(), groupByNames)
if err != nil {
return nil, err
}
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
return flow.Data{
// EntityValues as identity
dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
strings.Join(transform(groupLocator, func(locator partition.TagLocator) string {
return stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
}), "|"),
// field value as v2
// TODO: we only support int64
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue {
return extractTagValue(dpWithEvs.DataPointValue, locator)
}),
}
}, nil
}
// groupTagsLocator can be used to locate tags within families.
type groupTagsLocator []partition.TagLocator
// newGroupLocator generates a groupTagsLocator which strictly preserve the order of groupByNames.
func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLocator, error) {
groupTags := make([]partition.TagLocator, 0, len(groupByNames))
for _, groupByName := range groupByNames {
fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), groupByName)
if spec == nil {
return nil, fmt.Errorf("tag %s is not found", groupByName)
}
groupTags = append(groupTags, partition.TagLocator{
FamilyOffset: fIdx,
TagOffset: tIdx,
})
}
return groupTags, nil
}
func extractTagValue(dpv *measurev1.DataPointValue, locator partition.TagLocator) *modelv1.TagValue {
if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
}
tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
if locator.TagOffset >= len(tagFamily.GetTags()) {
return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
}
return tagFamily.GetTags()[locator.TagOffset]
}
func stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
case *modelv1.TagValue_Str:
return v.Str.GetValue()
case *modelv1.TagValue_Int:
return strconv.FormatInt(v.Int.GetValue(), 10)
case *modelv1.TagValue_BinaryData:
return base64.StdEncoding.EncodeToString(v.BinaryData)
case *modelv1.TagValue_IntArray:
return strings.Join(transform(v.IntArray.GetValue(), func(num int64) string {
return strconv.FormatInt(num, 10)
}), ",")
case *modelv1.TagValue_StrArray:
return strings.Join(v.StrArray.GetValue(), ",")
default:
return ""
}
}
func transform[I, O any](input []I, mapper func(I) O) []O {
output := make([]O, len(input))
for i := range input {
output[i] = mapper(input[i])
}
return output
}