pkg/query/logical/stream/stream_plan_indexscan_global.go (106 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "fmt" "io" "time" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) var _ logical.Plan = (*globalIndexScan)(nil) type globalIndexScan struct { schema logical.Schema metadata *commonv1.Metadata globalIndexRule *databasev1.IndexRule expr logical.LiteralExpr projectionTagRefs [][]*logical.TagRef } func (t *globalIndexScan) String() string { return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},conditions=%s; projection=%s", t.metadata.GetGroup(), t.metadata.GetName(), t.expr.String(), logical.FormatTagRefs(", ", t.projectionTagRefs...)) } func (t *globalIndexScan) Children() []logical.Plan { return []logical.Plan{} } func (t *globalIndexScan) Schema() logical.Schema { return t.schema } func (t *globalIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { shards, err := ec.Shards(nil) if err != nil { return nil, err } var elements []*streamv1.Element for _, shard := range shards { elementsInShard, shardErr := t.executeForShard(ec, shard) if shardErr != nil { return elements, shardErr } elements = append(elements, elementsInShard...) } return elements, nil } func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) { var elementsInShard []*streamv1.Element for _, term := range t.expr.Bytes() { itemIDs, err := shard.Index().Seek(index.Field{ Key: index.FieldKey{ SeriesID: tsdb.GlobalSeriesID(t.schema.Scope()), IndexRuleID: t.globalIndexRule.GetMetadata().GetId(), }, Term: term, }) if err != nil { return nil, err } for _, itemID := range itemIDs { segShard, err := ec.Shard(itemID.ShardID) if err != nil { return elementsInShard, errors.WithStack(err) } series, err := segShard.Series().GetByID(itemID.SeriesID) if err != nil { return elementsInShard, errors.WithStack(err) } err = func() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() item, closer, errInner := series.Get(ctx, itemID) defer func(closer io.Closer) { if closer != nil { _ = closer.Close() } }(closer) if errInner != nil { return errors.WithStack(errInner) } tagFamilies, errInner := logical.ProjectItem(ec, item, t.projectionTagRefs) if errInner != nil { return errors.WithStack(errInner) } elementID, errInner := ec.ParseElementID(item) if errInner != nil { return errors.WithStack(errInner) } elementsInShard = append(elementsInShard, &streamv1.Element{ ElementId: elementID, Timestamp: timestamppb.New(time.Unix(0, int64(item.Time()))), TagFamilies: tagFamilies, }) return nil }() if err != nil { return nil, err } } } return elementsInShard, nil }