in pkg/query/logical/stream/stream_plan_indexscan_local.go [65:144]
func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements []*streamv1.Element, err error) {
var seriesList tsdb.SeriesList
for _, e := range i.entities {
shards, errInternal := ec.Shards(e)
if errInternal != nil {
return nil, errInternal
}
for _, shard := range shards {
sl, errInternal := shard.Series().List(context.WithValue(
context.Background(),
logger.ContextKey,
i.l,
), tsdb.NewPath(e))
if errInternal != nil {
return nil, errInternal
}
seriesList = seriesList.Merge(sl)
}
}
if len(seriesList) == 0 {
return nil, nil
}
var builders []logical.SeekerBuilder
if i.order.Index != nil {
builders = append(builders, func(builder tsdb.SeekerBuilder) {
builder.OrderByIndex(i.order.Index, i.order.Sort)
})
} else {
builders = append(builders, func(builder tsdb.SeekerBuilder) {
builder.OrderByTime(i.order.Sort)
})
}
if i.filter != nil {
builders = append(builders, func(b tsdb.SeekerBuilder) {
b.Filter(i.filter)
})
}
iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...)
if err != nil {
return nil, err
}
if len(closers) > 0 {
defer func(closers []io.Closer) {
for _, c := range closers {
err = multierr.Append(err, c.Close())
}
}(closers)
}
var elems []*streamv1.Element
if len(iters) == 0 {
return elems, nil
}
it := logical.NewItemIter(iters, i.order.Sort)
defer func() {
err = multierr.Append(err, it.Close())
}()
for it.HasNext() {
nextItem := it.Next()
tagFamilies, innerErr := logical.ProjectItem(ec, nextItem, i.projectionTagRefs)
if innerErr != nil {
return nil, innerErr
}
elementID, innerErr := ec.ParseElementID(nextItem)
if innerErr != nil {
return nil, innerErr
}
elems = append(elems, &streamv1.Element{
ElementId: elementID,
Timestamp: timestamppb.New(time.Unix(0, int64(nextItem.Time()))),
TagFamilies: tagFamilies,
})
if len(elems) > i.maxElementSize {
break
}
}
return elems, nil
}