pkg/query/logical/stream/stream_plan_indexscan_local.go (131 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "encoding/hex" "fmt" "time" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( _ logical.Plan = (*localIndexScan)(nil) _ logical.Sorter = (*localIndexScan)(nil) _ logical.VolumeLimiter = (*localIndexScan)(nil) _ executor.StreamExecutable = (*localIndexScan)(nil) ) type localIndexScan struct { schema logical.Schema filter index.Filter result model.StreamQueryResult order *logical.OrderBy metadata *commonv1.Metadata l *logger.Logger timeRange timestamp.TimeRange projectionTagRefs [][]*logical.TagRef projectionTags []model.TagProjection entities [][]*modelv1.TagValue maxElementSize int } func (i *localIndexScan) Close() { if i.result != nil { i.result.Release() } } func (i *localIndexScan) Limit(maxVal int) { i.maxElementSize = maxVal } func (i *localIndexScan) Sort(order *logical.OrderBy) { i.order = order } func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } if i.result != nil { return BuildElementsFromStreamResult(ctx, i.result) } var orderBy *index.OrderBy if i.order != nil { orderBy = &index.OrderBy{ Index: i.order.Index, Sort: i.order.Sort, } } ec := executor.FromStreamExecutionContext(ctx) var err error if i.result, err = ec.Query(ctx, model.StreamQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, Filter: i.filter, Order: orderBy, TagProjection: i.projectionTags, MaxElementSize: i.maxElementSize, }); err != nil { return nil, err } if i.result == nil { return nil, nil } return BuildElementsFromStreamResult(ctx, i.result) } func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; orderBy=%s; limit=%d", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), i.order, i.maxElementSize) } func (i *localIndexScan) Children() []logical.Plan { return []logical.Plan{} } func (i *localIndexScan) Schema() logical.Schema { if len(i.projectionTagRefs) == 0 { return i.schema } return i.schema.ProjTags(i.projectionTagRefs...) } // BuildElementsFromStreamResult builds a slice of elements from the given stream query result. func BuildElementsFromStreamResult(ctx context.Context, result model.StreamQueryResult) (elements []*streamv1.Element, err error) { var r *model.StreamResult for { r = result.Pull(ctx) if r == nil { return nil, nil } if r.Error != nil { return nil, r.Error } if len(r.Timestamps) > 0 { break } } for i := range r.Timestamps { e := &streamv1.Element{ Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), ElementId: hex.EncodeToString(convert.Uint64ToBytes(r.ElementIDs[i])), } for _, tf := range r.TagFamilies { tagFamily := &modelv1.TagFamily{ Name: tf.Name, } e.TagFamilies = append(e.TagFamilies, tagFamily) for _, t := range tf.Tags { tagFamily.Tags = append(tagFamily.Tags, &modelv1.Tag{ Key: t.Name, Value: t.Values[i], }) } } elements = append(elements, e) } return elements, nil }