func()

in pkg/query/logical/measure/measure_plan_indexscan_local.go [125:191]


func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) {
	var orderBy *tsdb.OrderBy
	if i.order.Index != nil {
		orderBy = &tsdb.OrderBy{
			Index: i.order.Index,
			Sort:  i.order.Sort,
		}
	}
	var seriesList tsdb.SeriesList
	for _, e := range i.entities {
		shards, errInternal := ec.Shards(e)
		if errInternal != nil {
			return nil, errInternal
		}
		for _, shard := range shards {
			sl, errInternal := shard.Series().Search(
				context.WithValue(
					context.Background(),
					logger.ContextKey,
					i.l,
				),
				tsdb.NewPath(e),
				i.filter,
				orderBy,
			)
			if errInternal != nil {
				return nil, errInternal
			}
			seriesList = seriesList.Merge(sl)
		}
	}
	if len(seriesList) == 0 {
		return dummyIter, nil
	}
	var builders []logical.SeekerBuilder
	if i.order.Index == nil {
		builders = append(builders, func(builder tsdb.SeekerBuilder) {
			builder.OrderByTime(i.order.Sort)
		})
	}
	// CAVEAT: the order of series list matters when sorting by an index.
	iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...)
	if err != nil {
		return nil, err
	}
	if len(closers) > 0 {
		defer func(closers []io.Closer) {
			for _, c := range closers {
				err = multierr.Append(err, c.Close())
			}
		}(closers)
	}

	if len(iters) == 0 {
		return dummyIter, nil
	}
	transformContext := transformContext{
		ec:                   ec,
		projectionTagsRefs:   i.projectionTagsRefs,
		projectionFieldsRefs: i.projectionFieldsRefs,
	}
	if i.groupByEntity {
		return newSeriesMIterator(iters, transformContext, i.maxDataPointsSize), nil
	}
	it := logical.NewItemIter(iters, i.order.Sort)
	return newIndexScanIterator(it, transformContext, i.maxDataPointsSize), nil
}