banyand/measure/topn.go (693 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
apiData "github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
const (
timeBucketFormat = "200601021504"
resultPersistencyTimeout = 10 * time.Second
maxFlushInterval = time.Minute
)
var (
_ io.Closer = (*topNStreamingProcessor)(nil)
_ io.Closer = (*topNProcessorManager)(nil)
_ flow.Sink = (*topNStreamingProcessor)(nil)
)
func (sr *schemaRepo) getSteamingManager(source *commonv1.Metadata, pipeline queue.Queue) (manager *topNProcessorManager) {
key := getKey(source)
sourceMeasure, ok := sr.loadMeasure(source)
if !ok {
m, _ := sr.topNProcessorMap.LoadOrStore(key, &topNProcessorManager{
l: sr.l,
pipeline: pipeline,
})
manager = m.(*topNProcessorManager)
return manager
}
if v, ok := sr.topNProcessorMap.Load(key); ok {
pre := v.(*topNProcessorManager)
pre.init(sourceMeasure)
if pre.m.schema.GetMetadata().GetModRevision() < sourceMeasure.schema.GetMetadata().GetModRevision() {
defer pre.Close()
manager = &topNProcessorManager{
l: sr.l,
pipeline: pipeline,
}
manager.registeredTasks = append(manager.registeredTasks, pre.registeredTasks...)
} else {
return pre
}
}
if manager == nil {
manager = &topNProcessorManager{
l: sr.l,
pipeline: pipeline,
}
}
manager.init(sourceMeasure)
sr.topNProcessorMap.Store(key, manager)
return manager
}
func (sr *schemaRepo) stopSteamingManager(sourceMeasure *commonv1.Metadata) {
key := getKey(sourceMeasure)
if v, ok := sr.topNProcessorMap.Load(key); ok {
v.(*topNProcessorManager).Close()
sr.topNProcessorMap.Delete(key)
}
}
type dataPointWithEntityValues struct {
*measurev1.DataPointValue
entityValues []*modelv1.TagValue
seriesID uint64
shardID uint32
}
type topNStreamingProcessor struct {
pipeline queue.Queue
streamingFlow flow.Flow
in chan flow.StreamRecord
l *logger.Logger
topNSchema *databasev1.TopNAggregation
src chan interface{}
m *measure
errCh <-chan error
stopCh chan struct{}
flow.ComponentState
interval time.Duration
sortDirection modelv1.Sort
}
func (t *topNStreamingProcessor) In() chan<- flow.StreamRecord {
return t.in
}
func (t *topNStreamingProcessor) Setup(ctx context.Context) error {
t.Add(1)
go t.run(ctx)
return nil
}
func (t *topNStreamingProcessor) run(ctx context.Context) {
defer t.Done()
buf := make([]byte, 0, 64)
for {
select {
case record, ok := <-t.in:
if !ok {
return
}
// nolint: contextcheck
if err := t.writeStreamRecord(record, buf); err != nil {
t.l.Err(err).Msg("fail to write stream record")
}
case <-ctx.Done():
return
}
}
}
// Teardown is called by the Flow as a lifecycle hook.
// So we should not block on err channel within this method.
func (t *topNStreamingProcessor) Teardown(_ context.Context) error {
t.Wait()
return nil
}
func (t *topNStreamingProcessor) Close() error {
close(t.src)
// close streaming flow
err := t.streamingFlow.Close()
// and wait for error channel close
<-t.stopCh
t.stopCh = nil
return err
}
func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord, buf []byte) error {
tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
if !ok {
return errors.New("invalid data type")
}
// down-sample the start of the timeWindow to a time-bucket
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
var err error
publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
defer publisher.Close()
topNValue := GenerateTopNValue()
defer ReleaseTopNValue(topNValue)
for group, tuples := range tuplesGroups {
if e := t.l.Debug(); e.Enabled() {
for i := range tuples {
tuple := tuples[i]
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
e.
Int("rankNums", i+1).
Str("entityValues", fmt.Sprintf("%v", data[0])).
Int("value", int(data[2].(int64))).
Time("eventTime", eventTime).
Msgf("Write tuples %s %s", t.topNSchema.GetMetadata().GetName(), group)
}
}
topNValue.Reset()
topNValue.setMetadata(t.topNSchema.GetFieldName(), t.m.schema.Entity.TagNames)
var shardID uint32
for _, tuple := range tuples {
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
topNValue.addValue(
tuple.V1.(int64),
data[0].([]*modelv1.TagValue),
)
shardID = data[3].(uint32)
}
entityValues := []*modelv1.TagValue{
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: t.topNSchema.GetMetadata().GetName(),
},
},
},
{
Value: &modelv1.TagValue_Int{
Int: &modelv1.Int{
Value: int64(t.sortDirection),
},
},
},
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: group,
},
},
},
}
buf = buf[:0]
if buf, err = topNValue.marshal(buf); err != nil {
return err
}
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
MessageId: uint64(time.Now().UnixNano()),
Metadata: &commonv1.Metadata{Name: TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
{Tags: entityValues},
},
Fields: []*modelv1.FieldValue{
{
Value: &modelv1.FieldValue_BinaryData{
BinaryData: bytes.Clone(buf),
},
},
},
},
},
EntityValues: entityValues,
ShardId: shardID,
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
_, err = publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message)
if err != nil {
return err
}
}
return err
}
func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time {
return time.UnixMilli(eventTimeMillis - eventTimeMillis%t.interval.Milliseconds())
}
func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
flushInterval := t.interval
if flushInterval > maxFlushInterval {
flushInterval = maxFlushInterval
}
t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval, flushInterval)).
AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
TopN(int(t.topNSchema.GetCountersNumber()),
streaming.WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return record.Data().(flow.Data)[4].(uint64)
}),
streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[2].(int64)
}),
orderBy(t.topNSchema.GetFieldValueSort()),
streaming.WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[1].(string)
}),
).To(t).Open()
go t.handleError()
return t
}
func orderBy(sort modelv1.Sort) streaming.TopNOption {
if sort == modelv1.Sort_SORT_ASC {
return streaming.OrderBy(streaming.ASC)
}
return streaming.OrderBy(streaming.DESC)
}
func (t *topNStreamingProcessor) handleError() {
for err := range t.errCh {
t.l.Err(err).Str("topN", t.topNSchema.GetMetadata().GetName()).
Msg("error occurred during flow setup or process")
}
t.stopCh <- struct{}{}
}
// topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure.
type topNProcessorManager struct {
l *logger.Logger
pipeline queue.Queue
m *measure
s logical.TagSpecRegistry
registeredTasks []*databasev1.TopNAggregation
processorList []*topNStreamingProcessor
sync.RWMutex
}
func (manager *topNProcessorManager) init(m *measure) {
manager.Lock()
defer manager.Unlock()
if manager.m != nil {
return
}
manager.m = m
tagMapSpec := logical.TagSpecMap{}
tagMapSpec.RegisterTagFamilies(m.schema.GetTagFamilies())
for i := range manager.registeredTasks {
if err := manager.start(manager.registeredTasks[i]); err != nil {
manager.l.Err(err).Msg("fail to start processor")
}
}
}
func (manager *topNProcessorManager) Close() error {
manager.Lock()
defer manager.Unlock()
var err error
for _, processor := range manager.processorList {
err = multierr.Append(err, processor.Close())
}
manager.processorList = nil
manager.registeredTasks = nil
return err
}
func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID uint32, request *measurev1.InternalWriteRequest, measure *measure) {
go func() {
manager.RLock()
defer manager.RUnlock()
if manager.m == nil {
manager.RUnlock()
manager.init(measure)
manager.RLock()
}
for _, processor := range manager.processorList {
processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
request.GetRequest().GetDataPoint(),
request.GetEntityValues(),
seriesID,
shardID,
}, request.GetRequest().GetDataPoint().GetTimestamp())
}
}()
}
func (manager *topNProcessorManager) register(topNSchema *databasev1.TopNAggregation) {
manager.Lock()
defer manager.Unlock()
exist := false
for i := range manager.registeredTasks {
if manager.registeredTasks[i].GetMetadata().GetName() == topNSchema.GetMetadata().GetName() {
exist = true
if manager.registeredTasks[i].GetMetadata().GetModRevision() < topNSchema.GetMetadata().GetModRevision() {
prev := manager.registeredTasks[i]
prevProcessors := manager.removeProcessors(prev)
if err := manager.start(topNSchema); err != nil {
manager.l.Err(err).Msg("fail to start the new processor")
return
}
manager.registeredTasks[i] = topNSchema
for _, processor := range prevProcessors {
if err := processor.Close(); err != nil {
manager.l.Err(err).Msg("fail to close the prev processor")
}
}
}
}
}
if exist {
return
}
manager.registeredTasks = append(manager.registeredTasks, topNSchema)
if err := manager.start(topNSchema); err != nil {
manager.l.Err(err).Msg("fail to start processor")
}
}
func (manager *topNProcessorManager) start(topNSchema *databasev1.TopNAggregation) error {
if manager.m == nil {
return nil
}
interval := manager.m.interval
sortDirections := make([]modelv1.Sort, 0, 2)
if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
} else {
sortDirections = append(sortDirections, topNSchema.GetFieldValueSort())
}
processorList := make([]*topNStreamingProcessor, len(sortDirections))
for i, sortDirection := range sortDirections {
srcCh := make(chan interface{})
src, _ := sources.NewChannel(srcCh)
name := strings.Join([]string{topNSchema.GetMetadata().Group, topNSchema.GetMetadata().Name, modelv1.Sort_name[int32(sortDirection)]}, "-")
streamingFlow := streaming.New(name, src)
filters, buildErr := manager.buildFilter(topNSchema.GetCriteria())
if buildErr != nil {
return buildErr
}
streamingFlow = streamingFlow.Filter(filters)
mapper, innerErr := manager.buildMapper(topNSchema.GetFieldName(), topNSchema.GetGroupByTagNames()...)
if innerErr != nil {
return innerErr
}
streamingFlow = streamingFlow.Map(mapper)
processor := &topNStreamingProcessor{
m: manager.m,
l: manager.l,
interval: interval,
topNSchema: topNSchema,
sortDirection: sortDirection,
src: srcCh,
in: make(chan flow.StreamRecord),
stopCh: make(chan struct{}),
streamingFlow: streamingFlow,
pipeline: manager.pipeline,
}
processorList[i] = processor.start()
}
manager.processorList = append(manager.processorList, processorList...)
return nil
}
func (manager *topNProcessorManager) removeProcessors(topNSchema *databasev1.TopNAggregation) []*topNStreamingProcessor {
var processors []*topNStreamingProcessor
var newList []*topNStreamingProcessor
for i := range manager.processorList {
if manager.processorList[i].topNSchema.GetMetadata().GetName() == topNSchema.GetMetadata().GetName() {
processors = append(processors, manager.processorList[i])
} else {
newList = append(newList, manager.processorList[i])
}
}
manager.processorList = newList
return processors
}
func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (flow.UnaryFunc[bool], error) {
// if criteria is nil, we handle all incoming elements
if criteria == nil {
return func(_ context.Context, _ any) bool {
return true
}, nil
}
f, err := logical.BuildSimpleTagFilter(criteria)
if err != nil {
return nil, err
}
return func(_ context.Context, request any) bool {
tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
if matchErr != nil {
manager.l.Err(matchErr).Msg("fail to match criteria")
return false
}
return ok
}, nil
}
func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
return spec.GetName() == fieldName
})
if fieldIdx == -1 {
return nil, fmt.Errorf("field %s is not found in %s schema", fieldName, manager.m.GetSchema().GetMetadata().GetName())
}
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
if len(dpWithEvs.GetFields()) <= fieldIdx {
manager.l.Warn().Interface("point", dpWithEvs.DataPointValue).
Str("fieldName", fieldName).
Int("len", len(dpWithEvs.GetFields())).
Int("fieldIdx", fieldIdx).
Msg("out of range")
}
return flow.Data{
// EntityValues as identity
dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
"",
// field value as v2
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// shardID values as v3
dpWithEvs.shardID,
// seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
}
groupLocator, err := newGroupLocator(manager.m.GetSchema(), groupByNames)
if err != nil {
return nil, err
}
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
return flow.Data{
// EntityValues as identity
dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
GroupName(transform(groupLocator, func(locator partition.TagLocator) string {
return Stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
})),
// field value as v2
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// shardID values as v3
dpWithEvs.shardID,
// seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
}
// groupTagsLocator can be used to locate tags within families.
type groupTagsLocator []partition.TagLocator
// newGroupLocator generates a groupTagsLocator which strictly preserve the order of groupByNames.
func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLocator, error) {
groupTags := make([]partition.TagLocator, 0, len(groupByNames))
for _, groupByName := range groupByNames {
fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), groupByName)
if spec == nil {
return nil, fmt.Errorf("tag %s is not found", groupByName)
}
groupTags = append(groupTags, partition.TagLocator{
FamilyOffset: fIdx,
TagOffset: tIdx,
})
}
return groupTags, nil
}
func extractTagValue(dpv *measurev1.DataPointValue, locator partition.TagLocator) *modelv1.TagValue {
if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
}
tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
if locator.TagOffset >= len(tagFamily.GetTags()) {
return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
}
return tagFamily.GetTags()[locator.TagOffset]
}
// Stringify converts a TagValue to a string.
func Stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
case *modelv1.TagValue_Str:
return v.Str.GetValue()
case *modelv1.TagValue_Int:
return strconv.FormatInt(v.Int.GetValue(), 10)
case *modelv1.TagValue_BinaryData:
return base64.StdEncoding.EncodeToString(v.BinaryData)
case *modelv1.TagValue_IntArray:
return strings.Join(transform(v.IntArray.GetValue(), func(num int64) string {
return strconv.FormatInt(num, 10)
}), ",")
case *modelv1.TagValue_StrArray:
return strings.Join(v.StrArray.GetValue(), ",")
default:
return ""
}
}
func transform[I, O any](input []I, mapper func(I) O) []O {
output := make([]O, len(input))
for i := range input {
output[i] = mapper(input[i])
}
return output
}
// GenerateTopNValue returns a new TopNValue instance.
func GenerateTopNValue() *TopNValue {
v := topNValuePool.Get()
if v == nil {
return &TopNValue{}
}
return v
}
// ReleaseTopNValue releases a TopNValue instance.
func ReleaseTopNValue(bi *TopNValue) {
bi.Reset()
topNValuePool.Put(bi)
}
var topNValuePool = pool.Register[*TopNValue]("measure-topNValue")
// TopNValue represents the topN value.
type TopNValue struct {
valueName string
entityTagNames []string
values []int64
entities [][]*modelv1.TagValue
entityValues [][]byte
entityValuesBuf [][]byte
buf []byte
encodeType encoding.EncodeType
firstValue int64
}
func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
t.valueName = valueName
t.entityTagNames = entityTagNames
}
func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) {
t.values = append(t.values, value)
t.entities = append(t.entities, entityValues)
}
// Values returns the valueName, entityTagNames, values, and entities.
func (t *TopNValue) Values() (string, []string, []int64, [][]*modelv1.TagValue) {
return t.valueName, t.entityTagNames, t.values, t.entities
}
// Reset resets the TopNValue.
func (t *TopNValue) Reset() {
t.valueName = ""
t.entityTagNames = t.entityTagNames[:0]
t.values = t.values[:0]
for i := range t.entities {
t.entities[i] = t.entities[i][:0]
}
t.entities = t.entities[:0]
t.buf = t.buf[:0]
t.encodeType = encoding.EncodeTypeUnknown
t.firstValue = 0
t.entityValuesBuf = t.entityValuesBuf[:0]
t.entityValues = t.entityValues[:0]
}
func (t *TopNValue) resizeEntityValues(size int) [][]byte {
entityValues := t.entityValues
if n := size - cap(entityValues); n > 0 {
entityValues = append(entityValues[:cap(entityValues)], make([][]byte, n)...)
}
t.entityValues = entityValues[:size]
return t.entityValues
}
func (t *TopNValue) resizeEntities(size int, entitySize int) [][]*modelv1.TagValue {
entities := t.entities
if n := size - cap(t.entities); n > 0 {
entities = append(entities[:cap(entities)], make([][]*modelv1.TagValue, n)...)
}
t.entities = entities[:size]
for i := range t.entities {
entity := t.entities[i]
if n := entitySize - cap(entity); n > 0 {
entity = append(entity[:cap(entity)], make([]*modelv1.TagValue, n)...)
}
t.entities[i] = entity[:0]
}
return t.entities
}
func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
if len(t.values) == 0 {
return nil, errors.New("values is empty")
}
dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
for _, entityTagName := range t.entityTagNames {
dst = encoding.EncodeBytes(dst, convert.StringToBytes(entityTagName))
}
dst = encoding.VarUint64ToBytes(dst, uint64(len(t.values)))
t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, t.values)
dst = append(dst, byte(t.encodeType))
dst = encoding.VarInt64ToBytes(dst, t.firstValue)
dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
dst = append(dst, t.buf...)
var err error
evv := t.resizeEntityValues(len(t.entities))
for i, tvv := range t.entities {
ev := evv[i]
ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
if err != nil {
return nil, err
}
evv[i] = ev
}
dst = encoding.EncodeBytesBlock(dst, evv)
return dst, nil
}
// Unmarshal unmarshals the TopNValue from the src.
func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) error {
var err error
src, nameBytes, err := encoding.DecodeBytes(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.name: %w", err)
}
t.valueName = convert.BytesToString(nameBytes)
var entityTagNamesCount uint64
src, entityTagNamesCount = encoding.BytesToVarUint64(src)
t.entityTagNames = make([]string, 0, entityTagNamesCount)
var entityTagNameBytes []byte
for i := uint64(0); i < entityTagNamesCount; i++ {
src, entityTagNameBytes, err = encoding.DecodeBytes(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.entityTagName: %w", err)
}
t.entityTagNames = append(t.entityTagNames, convert.BytesToString(entityTagNameBytes))
}
var valuesCount uint64
src, valuesCount = encoding.BytesToVarUint64(src)
if len(src) < 1 {
return fmt.Errorf("cannot unmarshal topNValue.encodeType: src is too short")
}
t.encodeType = encoding.EncodeType(src[0])
src = src[1:]
if len(src) < 1 {
return fmt.Errorf("cannot unmarshal topNValue.firstValue: src is too short")
}
src, t.firstValue, err = encoding.BytesToVarInt64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.firstValue: %w", err)
}
if len(src) < 1 {
return fmt.Errorf("cannot unmarshal topNValue.valueLen: src is too short")
}
var valueLen uint64
src, valueLen = encoding.BytesToVarUint64(src)
if uint64(len(src)) < valueLen {
return fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", valueLen, len(src))
}
t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen], t.encodeType, t.firstValue, int(valuesCount))
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.values: %w", err)
}
if uint64(len(src)) < valueLen {
return fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", valueLen, len(src))
}
decoder.Reset()
evv := t.entityValuesBuf
evv, err = decoder.Decode(evv[:0], src[valueLen:], valuesCount)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.entityValues: %w", err)
}
t.resizeEntities(len(evv), int(entityTagNamesCount))
for i, ev := range evv {
t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf, t.entities[i], ev)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.entityValues[%d]:%s %w", i, hex.EncodeToString(ev), err)
}
if len(t.entities[i]) != len(t.entityTagNames) {
return fmt.Errorf("entityValues[%d] length is not equal to entityTagNames", i)
}
}
return nil
}
// GroupName returns the group name.
func GroupName(groupTags []string) string {
return strings.Join(groupTags, "|")
}