func()

in banyand/measure/query.go [73:180]


func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr model.MeasureQueryResult, err error) {
	if mqo.TimeRange == nil {
		return nil, errors.New("invalid query options: timeRange are required")
	}
	if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
		return nil, errors.New("invalid query options: tagProjection or fieldProjection is required")
	}
	var tsdb storage.TSDB[*tsTable, option]
	db := s.tsdb.Load()
	if db == nil {
		tsdb, err = s.schemaRepo.loadTSDB(s.group)
		if err != nil {
			return nil, err
		}
		s.tsdb.Store(tsdb)
	} else {
		tsdb = db.(storage.TSDB[*tsTable, option])
	}

	segments, err := tsdb.SelectSegments(*mqo.TimeRange)
	if err != nil {
		return nil, err
	}
	if len(segments) < 1 {
		return nilResult, nil
	}

	if s.schema.IndexMode {
		return s.buildIndexQueryResult(ctx, mqo, segments)
	}

	if len(mqo.Entities) < 1 {
		return nil, errors.New("invalid query options: series is required")
	}

	series := make([]*pbv1.Series, len(mqo.Entities))
	for i := range mqo.Entities {
		series[i] = &pbv1.Series{
			Subject:      mqo.Name,
			EntityValues: mqo.Entities[i],
		}
	}

	sids, tables, storedIndexValue, newTagProjection, err := s.searchSeriesList(ctx, series, mqo, segments)
	if err != nil {
		return nil, err
	}
	if len(sids) < 1 {
		for i := range segments {
			segments[i].DecRef()
		}
		return nilResult, nil
	}
	result := queryResult{
		ctx:              ctx,
		segments:         segments,
		tagProjection:    mqo.TagProjection,
		storedIndexValue: storedIndexValue,
	}
	defer func() {
		if err != nil {
			result.Release()
		}
	}()
	mqo.TagProjection = newTagProjection
	var parts []*part
	qo := queryOptions{
		MeasureQueryOptions: mqo,
		minTimestamp:        mqo.TimeRange.Start.UnixNano(),
		maxTimestamp:        mqo.TimeRange.End.UnixNano(),
	}
	var n int
	for i := range tables {
		s := tables[i].currentSnapshot()
		if s == nil {
			continue
		}
		parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
		if n < 1 {
			s.decRef()
			continue
		}
		result.snapshots = append(result.snapshots, s)
	}

	if err = s.searchBlocks(ctx, &result, sids, parts, qo); err != nil {
		return nil, err
	}

	if mqo.Order == nil {
		result.ascTS = true
		result.orderByTS = true
	} else {
		if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED {
			result.ascTS = true
		}
		switch mqo.Order.Type {
		case index.OrderByTypeTime:
			result.orderByTS = true
		case index.OrderByTypeIndex:
			result.orderByTS = false
		case index.OrderByTypeSeries:
			result.orderByTS = false
		}
	}

	return &result, nil
}