in banyand/query/processor_topn.go [51:144]
func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
return
}
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
t.log.Warn().Msg("invalid requested sort direction")
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
}
topNMetadata := request.GetMetadata()
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to get execution context")
return
}
if topNSchema.GetFieldValueSort() != modelv1.Sort_SORT_UNSPECIFIED &&
topNSchema.GetFieldValueSort() != request.GetFieldValueSort() {
t.log.Warn().Msg("unmatched sort direction")
return
}
sourceMeasure, err := t.measureService.Measure(topNSchema.GetSourceMeasure())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to find source measure")
return
}
shards, err := sourceMeasure.CompanionShards(topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to list shards")
return
}
aggregator := createTopNPostAggregator(request.GetTopN(),
request.GetAgg(), request.GetFieldValueSort())
entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Msg("fail to parse entity")
return
}
for _, shard := range shards {
// TODO: support condition
sl, innerErr := shard.Series().List(context.WithValue(
context.Background(),
logger.ContextKey,
t.log,
), tsdb.NewPath(entity))
if innerErr != nil {
t.log.Error().Err(innerErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to list series")
return
}
for _, series := range sl {
iters, scanErr := t.scanSeries(series, request)
if scanErr != nil {
t.log.Error().Err(innerErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to scan series")
return
}
if len(iters) < 1 {
continue
}
for _, iter := range iters {
for iter.Next() {
tuple, parseErr := parseTopNFamily(iter.Val(), sourceMeasure.GetInterval())
if parseErr != nil {
t.log.Error().Err(parseErr).
Str("topN", topNMetadata.GetName()).
Msg("fail to parse topN family")
return
}
_ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time())
}
_ = iter.Close()
}
}
}
now := time.Now().UnixNano()
resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
return
}