in banyand/dquery/topn.go [50:166]
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
return
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
return
}
if request.GetAgg() == modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested aggregation function"))
return
}
if e := t.log.Debug(); e.Enabled() {
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
nodeSelectors := make(map[string][]string)
for _, g := range request.Groups {
if gs, ok := t.measureService.LoadGroup(g); ok {
if ns, exist := t.parseNodeSelector(request.Stages, gs.GetSchema().ResourceOpts); exist {
nodeSelectors[g] = ns
} else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
t.log.Error().Strs("req_stages", request.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
resp = bus.NewMessage(now, common.NewError("no stage found in request or default stages in resource opts"))
return
}
} else {
t.log.Error().Str("group", g).Msg("failed to load group")
resp = bus.NewMessage(now, common.NewError("failed to load group %s", g))
return
}
}
if len(request.Stages) > 0 && len(nodeSelectors) == 0 {
t.log.Error().RawJSON("req", logger.Proto(request)).Msg("no stage found")
resp = bus.NewMessage(now, common.NewError("no stage found"))
return
}
if request.Trace {
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case *common.Error:
span.Error(errors.New(d.Error()))
resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, request.TimeRange, request))
if err != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), err))
return
}
var allErr error
aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
agg, request.GetFieldValueSort())
var tags []string
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
} else {
d := m.Data()
if d == nil {
continue
}
topNResp := d.(*measurev1.TopNResponse)
for _, l := range topNResp.Lists {
for _, tn := range l.Items {
if tags == nil {
tags = make([]string, 0, len(tn.Entity))
for _, e := range tn.Entity {
tags = append(tags, e.Key)
}
}
entityValues := make(pbv1.EntityValues, 0, len(tn.Entity))
for _, e := range tn.Entity {
entityValues = append(entityValues, e.Value)
}
_ = aggregator.Put(entityValues, tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
}
}
}
}
if allErr != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), allErr))
return
}
if tags == nil {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
lists := aggregator.Val(tags)
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: lists,
})
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
}
}
return
}