banyand/dquery/topn.go (168 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package dquery
import (
"context"
"errors"
"time"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
pkgquery "github.com/apache/skywalking-banyandb/pkg/query"
)
const defaultTopNQueryTimeout = 10 * time.Second
type topNQueryProcessor struct {
measureService measure.SchemaService
broadcaster bus.Broadcaster
*queryService
*bus.UnImplementedHealthyListener
}
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
return
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
return
}
if request.GetAgg() == modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested aggregation function"))
return
}
if e := t.log.Debug(); e.Enabled() {
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
nodeSelectors := make(map[string][]string)
for _, g := range request.Groups {
if gs, ok := t.measureService.LoadGroup(g); ok {
if ns, exist := t.parseNodeSelector(request.Stages, gs.GetSchema().ResourceOpts); exist {
nodeSelectors[g] = ns
} else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
t.log.Error().Strs("req_stages", request.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
resp = bus.NewMessage(now, common.NewError("no stage found in request or default stages in resource opts"))
return
}
} else {
t.log.Error().Str("group", g).Msg("failed to load group")
resp = bus.NewMessage(now, common.NewError("failed to load group %s", g))
return
}
}
if len(request.Stages) > 0 && len(nodeSelectors) == 0 {
t.log.Error().RawJSON("req", logger.Proto(request)).Msg("no stage found")
resp = bus.NewMessage(now, common.NewError("no stage found"))
return
}
if request.Trace {
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case *common.Error:
span.Error(errors.New(d.Error()))
resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, request.TimeRange, request))
if err != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), err))
return
}
var allErr error
aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
agg, request.GetFieldValueSort())
var tags []string
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
} else {
d := m.Data()
if d == nil {
continue
}
topNResp := d.(*measurev1.TopNResponse)
for _, l := range topNResp.Lists {
for _, tn := range l.Items {
if tags == nil {
tags = make([]string, 0, len(tn.Entity))
for _, e := range tn.Entity {
tags = append(tags, e.Key)
}
}
entityValues := make(pbv1.EntityValues, 0, len(tn.Entity))
for _, e := range tn.Entity {
entityValues = append(entityValues, e.Value)
}
_ = aggregator.Put(entityValues, tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
}
}
}
}
if allErr != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), allErr))
return
}
if tags == nil {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
lists := aggregator.Val(tags)
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: lists,
})
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
}
}
return
}
var _ sort.Comparable = (*comparableTopNItem)(nil)
type comparableTopNItem struct {
*measurev1.TopNList_Item
}
func (c *comparableTopNItem) SortedField() []byte {
return convert.Int64ToBytes(c.Value.GetInt().Value)
}
var _ sort.Iterator[*comparableTopNItem] = (*sortedTopNList)(nil)
type sortedTopNList struct {
*measurev1.TopNList
index int
}
func (*sortedTopNList) Close() error {
return nil
}
func (s *sortedTopNList) Next() bool {
if s.index >= len(s.Items) {
return false
}
s.index++
return s.index < len(s.Items)
}
func (s *sortedTopNList) Val() *comparableTopNItem {
return &comparableTopNItem{s.Items[s.index-1]}
}