func()

in banyand/query/processor_topn.go [51:144]


func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
	request, ok := message.Data().(*measurev1.TopNRequest)
	if !ok {
		t.log.Warn().Msg("invalid event data type")
		return
	}
	if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
		t.log.Warn().Msg("invalid requested sort direction")
		return
	}
	if e := t.log.Debug(); e.Enabled() {
		e.Stringer("req", request).Msg("received a topN query event")
	}
	topNMetadata := request.GetMetadata()
	topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
	if err != nil {
		t.log.Error().Err(err).
			Str("topN", topNMetadata.GetName()).
			Msg("fail to get execution context")
		return
	}
	if topNSchema.GetFieldValueSort() != modelv1.Sort_SORT_UNSPECIFIED &&
		topNSchema.GetFieldValueSort() != request.GetFieldValueSort() {
		t.log.Warn().Msg("unmatched sort direction")
		return
	}
	sourceMeasure, err := t.measureService.Measure(topNSchema.GetSourceMeasure())
	if err != nil {
		t.log.Error().Err(err).
			Str("topN", topNMetadata.GetName()).
			Msg("fail to find source measure")
		return
	}
	shards, err := sourceMeasure.CompanionShards(topNMetadata)
	if err != nil {
		t.log.Error().Err(err).
			Str("topN", topNMetadata.GetName()).
			Msg("fail to list shards")
		return
	}
	aggregator := createTopNPostAggregator(request.GetTopN(),
		request.GetAgg(), request.GetFieldValueSort())
	entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
	if err != nil {
		t.log.Error().Err(err).
			Str("topN", topNMetadata.GetName()).
			Msg("fail to parse entity")
		return
	}
	for _, shard := range shards {
		// TODO: support condition
		sl, innerErr := shard.Series().List(context.WithValue(
			context.Background(),
			logger.ContextKey,
			t.log,
		), tsdb.NewPath(entity))
		if innerErr != nil {
			t.log.Error().Err(innerErr).
				Str("topN", topNMetadata.GetName()).
				Msg("fail to list series")
			return
		}
		for _, series := range sl {
			iters, scanErr := t.scanSeries(series, request)
			if scanErr != nil {
				t.log.Error().Err(innerErr).
					Str("topN", topNMetadata.GetName()).
					Msg("fail to scan series")
				return
			}
			if len(iters) < 1 {
				continue
			}
			for _, iter := range iters {
				for iter.Next() {
					tuple, parseErr := parseTopNFamily(iter.Val(), sourceMeasure.GetInterval())
					if parseErr != nil {
						t.log.Error().Err(parseErr).
							Str("topN", topNMetadata.GetName()).
							Msg("fail to parse topN family")
						return
					}
					_ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time())
				}
				_ = iter.Close()
			}
		}
	}

	now := time.Now().UnixNano()
	resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))

	return
}