in pkg/query/logical/stream/stream_plan_indexscan_global.go [78:134]
func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) {
var elementsInShard []*streamv1.Element
for _, term := range t.expr.Bytes() {
itemIDs, err := shard.Index().Seek(index.Field{
Key: index.FieldKey{
SeriesID: tsdb.GlobalSeriesID(t.schema.Scope()),
IndexRuleID: t.globalIndexRule.GetMetadata().GetId(),
},
Term: term,
})
if err != nil {
return nil, err
}
for _, itemID := range itemIDs {
segShard, err := ec.Shard(itemID.ShardID)
if err != nil {
return elementsInShard, errors.WithStack(err)
}
series, err := segShard.Series().GetByID(itemID.SeriesID)
if err != nil {
return elementsInShard, errors.WithStack(err)
}
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
item, closer, errInner := series.Get(ctx, itemID)
defer func(closer io.Closer) {
if closer != nil {
_ = closer.Close()
}
}(closer)
if errInner != nil {
return errors.WithStack(errInner)
}
tagFamilies, errInner := logical.ProjectItem(ec, item, t.projectionTagRefs)
if errInner != nil {
return errors.WithStack(errInner)
}
elementID, errInner := ec.ParseElementID(item)
if errInner != nil {
return errors.WithStack(errInner)
}
elementsInShard = append(elementsInShard, &streamv1.Element{
ElementId: elementID,
Timestamp: timestamppb.New(time.Unix(0, int64(item.Time()))),
TagFamilies: tagFamilies,
})
return nil
}()
if err != nil {
return nil, err
}
}
}
return elementsInShard, nil
}