in banyand/query/processor_topn.go [51:172]
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
n := time.Now()
now := n.UnixNano()
if !ok {
t.log.Warn().Msg("invalid event data type")
return
}
if len(request.Groups) > 1 {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("only support one group in the query request"))
return
}
ml := t.log.Named("topn", request.Groups[0], request.Name)
if e := ml.Debug(); e.Enabled() {
e.RawJSON("req", logger.Proto(request)).Msg("received a topn event")
}
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
t.log.Warn().Msg("invalid requested sort direction")
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
}
topNMetadata := &commonv1.Metadata{
Name: request.Name,
Group: request.Groups[0],
}
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(ctx, topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to get execution context")
return
}
if topNSchema.GetFieldValueSort() != modelv1.Sort_SORT_UNSPECIFIED &&
topNSchema.GetFieldValueSort() != request.GetFieldValueSort() {
t.log.Warn().Msg("unmatched sort direction")
return
}
sourceMeasure, err := t.measureService.Measure(topNSchema.GetSourceMeasure())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to find source measure")
return
}
plan, err := logical_measure.TopNAnalyze(request, sourceMeasure.GetSchema(), topNSchema)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
return
}
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("topn plan")
}
topNResultMeasure, err := t.measureService.Measure(measure.GetTopNSchemaMetadata(topNMetadata.Group))
if err != nil {
ml.Error().Err(err).Str("topN", topNMetadata.GetName()).Msg("fail to find topn result measure")
return
}
var tracer *query.Tracer
var span *query.Span
if request.Trace {
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "data-%s", t.queryService.nodeID)
span.Tag("plan", plan.String())
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case *common.Error:
span.Error(errors.New(d.Error()))
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx, topNResultMeasure))
if err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(request)).Msg("fail to close the topn plan")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the topn plan for measure %s: %v", topNMetadata.GetName(), err))
return
}
defer func() {
if err = mIterator.Close(); err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(request))
}
}()
result := make([]*measurev1.DataPoint, 0)
func() {
var r int
if tracer != nil {
iterSpan, _ := tracer.StartSpan(ctx, "iterator")
defer func() {
iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
iterSpan.Tag("size", fmt.Sprintf("%d", len(result)))
iterSpan.Stop()
}()
}
for mIterator.Next() {
r++
current := mIterator.Current()
if len(current) > 0 {
result = append(result, current[0])
}
}
}()
resp = bus.NewMessage(bus.MessageID(now), toTopNResponse(result))
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(result)).Msg("top_n slow query")
}
}
return
}