func()

in pkg/query/logical/stream/stream_plan_indexscan_local.go [65:144]


func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements []*streamv1.Element, err error) {
	var seriesList tsdb.SeriesList
	for _, e := range i.entities {
		shards, errInternal := ec.Shards(e)
		if errInternal != nil {
			return nil, errInternal
		}
		for _, shard := range shards {
			sl, errInternal := shard.Series().List(context.WithValue(
				context.Background(),
				logger.ContextKey,
				i.l,
			), tsdb.NewPath(e))
			if errInternal != nil {
				return nil, errInternal
			}
			seriesList = seriesList.Merge(sl)
		}
	}
	if len(seriesList) == 0 {
		return nil, nil
	}
	var builders []logical.SeekerBuilder
	if i.order.Index != nil {
		builders = append(builders, func(builder tsdb.SeekerBuilder) {
			builder.OrderByIndex(i.order.Index, i.order.Sort)
		})
	} else {
		builders = append(builders, func(builder tsdb.SeekerBuilder) {
			builder.OrderByTime(i.order.Sort)
		})
	}
	if i.filter != nil {
		builders = append(builders, func(b tsdb.SeekerBuilder) {
			b.Filter(i.filter)
		})
	}
	iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...)
	if err != nil {
		return nil, err
	}
	if len(closers) > 0 {
		defer func(closers []io.Closer) {
			for _, c := range closers {
				err = multierr.Append(err, c.Close())
			}
		}(closers)
	}

	var elems []*streamv1.Element

	if len(iters) == 0 {
		return elems, nil
	}

	it := logical.NewItemIter(iters, i.order.Sort)
	defer func() {
		err = multierr.Append(err, it.Close())
	}()
	for it.HasNext() {
		nextItem := it.Next()
		tagFamilies, innerErr := logical.ProjectItem(ec, nextItem, i.projectionTagRefs)
		if innerErr != nil {
			return nil, innerErr
		}
		elementID, innerErr := ec.ParseElementID(nextItem)
		if innerErr != nil {
			return nil, innerErr
		}
		elems = append(elems, &streamv1.Element{
			ElementId:   elementID,
			Timestamp:   timestamppb.New(time.Unix(0, int64(nextItem.Time()))),
			TagFamilies: tagFamilies,
		})
		if len(elems) > i.maxElementSize {
			break
		}
	}
	return elems, nil
}