func()

in pkg/query/logical/stream/stream_plan_indexscan_global.go [78:134]


func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) {
	var elementsInShard []*streamv1.Element
	for _, term := range t.expr.Bytes() {
		itemIDs, err := shard.Index().Seek(index.Field{
			Key: index.FieldKey{
				SeriesID:    tsdb.GlobalSeriesID(t.schema.Scope()),
				IndexRuleID: t.globalIndexRule.GetMetadata().GetId(),
			},
			Term: term,
		})
		if err != nil {
			return nil, err
		}
		for _, itemID := range itemIDs {
			segShard, err := ec.Shard(itemID.ShardID)
			if err != nil {
				return elementsInShard, errors.WithStack(err)
			}
			series, err := segShard.Series().GetByID(itemID.SeriesID)
			if err != nil {
				return elementsInShard, errors.WithStack(err)
			}
			err = func() error {
				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
				defer cancel()
				item, closer, errInner := series.Get(ctx, itemID)
				defer func(closer io.Closer) {
					if closer != nil {
						_ = closer.Close()
					}
				}(closer)
				if errInner != nil {
					return errors.WithStack(errInner)
				}
				tagFamilies, errInner := logical.ProjectItem(ec, item, t.projectionTagRefs)
				if errInner != nil {
					return errors.WithStack(errInner)
				}
				elementID, errInner := ec.ParseElementID(item)
				if errInner != nil {
					return errors.WithStack(errInner)
				}
				elementsInShard = append(elementsInShard, &streamv1.Element{
					ElementId:   elementID,
					Timestamp:   timestamppb.New(time.Unix(0, int64(item.Time()))),
					TagFamilies: tagFamilies,
				})
				return nil
			}()
			if err != nil {
				return nil, err
			}
		}
	}

	return elementsInShard, nil
}