func()

in banyand/measure/query.go [280:359]


func (s *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQueryOptions,
	segments []storage.Segment[*tsTable, option],
) (model.MeasureQueryResult, error) {
	defer func() {
		for i := range segments {
			segments[i].DecRef()
		}
	}()
	is := s.indexSchema.Load().(indexSchema)
	r := &indexSortResult{}
	var indexProjection []index.FieldKey
	for _, tp := range mqo.TagProjection {
		tagFamilyLocation := tagFamilyLocation{
			name:                   tp.Family,
			fieldToValueType:       make(map[string]tagNameWithType),
			projectedEntityOffsets: make(map[string]int),
		}
	TAG:
		for _, n := range tp.Names {
			tagFamilyLocation.tagNames = append(tagFamilyLocation.tagNames, n)
			for i := range s.schema.GetEntity().GetTagNames() {
				if n == s.schema.GetEntity().GetTagNames()[i] {
					tagFamilyLocation.projectedEntityOffsets[n] = i
					continue TAG
				}
			}
			if is.fieldIndexLocation != nil {
				if fields, ok := is.fieldIndexLocation[tp.Family]; ok {
					if field, ok := fields[n]; ok {
						indexProjection = append(indexProjection, field.Key)
						tagFamilyLocation.fieldToValueType[n] = tagNameWithType{
							fieldName: field.Key.Marshal(),
							typ:       field.Type,
						}
						continue TAG
					}
				}
			}
			return nil, fmt.Errorf("tag %s not found in schema", n)
		}
		r.tfl = append(r.tfl, tagFamilyLocation)
	}
	var err error
	opts := storage.IndexSearchOpts{
		Query:       mqo.Query,
		Order:       mqo.Order,
		PreloadSize: preloadSize,
		Projection:  indexProjection,
	}
	seriesFilter := roaring.NewPostingList()
	for i := range segments {
		if mqo.TimeRange.Include(segments[i].GetTimeRange()) {
			opts.TimeRange = nil
		} else {
			opts.TimeRange = mqo.TimeRange
		}
		sr := &segResult{}
		sr.SeriesData, sr.sortedValues, err = segments[i].IndexDB().SearchWithoutSeries(ctx, opts)
		if err != nil {
			return nil, err
		}
		for j := 0; j < len(sr.SeriesList); j++ {
			if seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) {
				sr.remove(j)
				j--
				continue
			}
			seriesFilter.Insert(uint64(sr.SeriesList[j].ID))
		}
		if len(sr.SeriesList) < 1 {
			continue
		}
		r.segResults = append(r.segResults, sr)
	}
	if len(r.segResults) < 1 {
		return nilResult, nil
	}
	heap.Init(&r.segResults)
	return r, nil
}