func()

in banyand/measure/query.go [187:278]


func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions,
	segments []storage.Segment[*tsTable, option],
) (sl []common.SeriesID, tables []*tsTable, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
	newTagProjection []model.TagProjection, err error,
) {
	var indexProjection []index.FieldKey
	fieldToValueType := make(map[string]tagNameWithType)
	var projectedEntityOffsets map[string]int
	newTagProjection = make([]model.TagProjection, 0)
	is := s.indexSchema.Load().(indexSchema)
	for _, tp := range mqo.TagProjection {
		var tagProjection model.TagProjection
	TAG:
		for _, n := range tp.Names {
			for i := range s.schema.GetEntity().GetTagNames() {
				if n == s.schema.GetEntity().GetTagNames()[i] {
					if projectedEntityOffsets == nil {
						projectedEntityOffsets = make(map[string]int)
					}
					projectedEntityOffsets[n] = i
					continue TAG
				}
			}
			if is.fieldIndexLocation != nil {
				if fields, ok := is.fieldIndexLocation[tp.Family]; ok {
					if field, ok := fields[n]; ok {
						indexProjection = append(indexProjection, field.Key)
						fieldToValueType[field.Key.Marshal()] = tagNameWithType{
							fieldName: n,
							typ:       field.Type,
						}
						continue TAG
					}
				}
			}
			tagProjection.Family = tp.Family
			tagProjection.Names = append(tagProjection.Names, n)
		}
		if tagProjection.Family != "" {
			newTagProjection = append(newTagProjection, tagProjection)
		}
	}
	seriesFilter := roaring.NewPostingList()
	for i := range segments {
		sd, _, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{
			Query:       mqo.Query,
			Order:       mqo.Order,
			PreloadSize: preloadSize,
			Projection:  indexProjection,
		})
		if err != nil {
			return nil, nil, nil, nil, err
		}
		if len(sd.SeriesList) > 0 {
			tables = append(tables, segments[i].Tables()...)
		}
		for j := range sd.SeriesList {
			if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) {
				continue
			}
			seriesFilter.Insert(uint64(sd.SeriesList[j].ID))
			sl = append(sl, sd.SeriesList[j].ID)
			if projectedEntityOffsets == nil && sd.Fields == nil {
				continue
			}
			if storedIndexValue == nil {
				storedIndexValue = make(map[common.SeriesID]map[string]*modelv1.TagValue)
			}
			tagValues := make(map[string]*modelv1.TagValue)
			storedIndexValue[sd.SeriesList[j].ID] = tagValues
			for name, offset := range projectedEntityOffsets {
				if offset < 0 || offset >= len(sd.SeriesList[j].EntityValues) {
					logger.Warningf("offset %d for tag %s is out of range for series ID %v", offset, name, sd.SeriesList[j].ID)
					tagValues[name] = pbv1.NullTagValue
					continue
				}
				tagValues[name] = sd.SeriesList[j].EntityValues[offset]
			}
			if sd.Fields == nil {
				continue
			}
			for f, v := range sd.Fields[j] {
				if tnt, ok := fieldToValueType[f]; ok {
					tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, v)
				} else {
					logger.Panicf("unknown field %s not found in fieldToValueType", f)
				}
			}
		}
	}
	return sl, tables, storedIndexValue, newTagProjection, nil
}