func()

in banyand/dquery/topn.go [50:166]


func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
	request, ok := message.Data().(*measurev1.TopNRequest)
	if !ok {
		t.log.Warn().Msg("invalid event data type")
		return
	}
	n := time.Now()
	now := bus.MessageID(request.TimeRange.Begin.Nanos)
	if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
		resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
		return
	}
	if request.GetAgg() == modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
		resp = bus.NewMessage(now, common.NewError("unspecified requested aggregation function"))
		return
	}
	if e := t.log.Debug(); e.Enabled() {
		e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
	}
	nodeSelectors := make(map[string][]string)
	for _, g := range request.Groups {
		if gs, ok := t.measureService.LoadGroup(g); ok {
			if ns, exist := t.parseNodeSelector(request.Stages, gs.GetSchema().ResourceOpts); exist {
				nodeSelectors[g] = ns
			} else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
				t.log.Error().Strs("req_stages", request.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
				resp = bus.NewMessage(now, common.NewError("no stage found in request or default stages in resource opts"))
				return
			}
		} else {
			t.log.Error().Str("group", g).Msg("failed to load group")
			resp = bus.NewMessage(now, common.NewError("failed to load group %s", g))
			return
		}
	}
	if len(request.Stages) > 0 && len(nodeSelectors) == 0 {
		t.log.Error().RawJSON("req", logger.Proto(request)).Msg("no stage found")
		resp = bus.NewMessage(now, common.NewError("no stage found"))
		return
	}
	if request.Trace {
		var tracer *pkgquery.Tracer
		tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano))
		span, _ := tracer.StartSpan(ctx, "distributed-client")
		span.Tag("request", convert.BytesToString(logger.Proto(request)))
		span.Tagf("nodeSelectors", "%v", nodeSelectors)
		defer func() {
			data := resp.Data()
			switch d := data.(type) {
			case *measurev1.TopNResponse:
				d.Trace = tracer.ToProto()
			case *common.Error:
				span.Error(errors.New(d.Error()))
				resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
			default:
				panic("unexpected data type")
			}
			span.Stop()
		}()
	}
	agg := request.Agg
	request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
	ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, request.TimeRange, request))
	if err != nil {
		resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), err))
		return
	}
	var allErr error
	aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
		agg, request.GetFieldValueSort())
	var tags []string
	for _, f := range ff {
		if m, getErr := f.Get(); getErr != nil {
			allErr = multierr.Append(allErr, getErr)
		} else {
			d := m.Data()
			if d == nil {
				continue
			}
			topNResp := d.(*measurev1.TopNResponse)
			for _, l := range topNResp.Lists {
				for _, tn := range l.Items {
					if tags == nil {
						tags = make([]string, 0, len(tn.Entity))
						for _, e := range tn.Entity {
							tags = append(tags, e.Key)
						}
					}
					entityValues := make(pbv1.EntityValues, 0, len(tn.Entity))
					for _, e := range tn.Entity {
						entityValues = append(entityValues, e.Value)
					}
					_ = aggregator.Put(entityValues, tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
				}
			}
		}
	}
	if allErr != nil {
		resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), allErr))
		return
	}
	if tags == nil {
		resp = bus.NewMessage(now, &measurev1.TopNResponse{})
		return
	}
	lists := aggregator.Val(tags)
	resp = bus.NewMessage(now, &measurev1.TopNResponse{
		Lists: lists,
	})
	if !request.Trace && t.slowQuery > 0 {
		latency := time.Since(n)
		if latency > t.slowQuery {
			t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
		}
	}
	return
}