pkg/query/logical/stream/stream_plan_indexscan_local.go (130 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "fmt" "io" "time" "go.uber.org/multierr" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( _ logical.Plan = (*localIndexScan)(nil) _ logical.Sorter = (*localIndexScan)(nil) _ logical.VolumeLimiter = (*localIndexScan)(nil) ) type localIndexScan struct { schema logical.Schema filter index.Filter order *logical.OrderBy metadata *commonv1.Metadata l *logger.Logger timeRange timestamp.TimeRange projectionTagRefs [][]*logical.TagRef entities []tsdb.Entity maxElementSize int } func (i *localIndexScan) Limit(max int) { i.maxElementSize = max } func (i *localIndexScan) Sort(order *logical.OrderBy) { i.order = order } func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements []*streamv1.Element, err error) { var seriesList tsdb.SeriesList for _, e := range i.entities { shards, errInternal := ec.Shards(e) if errInternal != nil { return nil, errInternal } for _, shard := range shards { sl, errInternal := shard.Series().List(context.WithValue( context.Background(), logger.ContextKey, i.l, ), tsdb.NewPath(e)) if errInternal != nil { return nil, errInternal } seriesList = seriesList.Merge(sl) } } if len(seriesList) == 0 { return nil, nil } var builders []logical.SeekerBuilder if i.order.Index != nil { builders = append(builders, func(builder tsdb.SeekerBuilder) { builder.OrderByIndex(i.order.Index, i.order.Sort) }) } else { builders = append(builders, func(builder tsdb.SeekerBuilder) { builder.OrderByTime(i.order.Sort) }) } if i.filter != nil { builders = append(builders, func(b tsdb.SeekerBuilder) { b.Filter(i.filter) }) } iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...) if err != nil { return nil, err } if len(closers) > 0 { defer func(closers []io.Closer) { for _, c := range closers { err = multierr.Append(err, c.Close()) } }(closers) } var elems []*streamv1.Element if len(iters) == 0 { return elems, nil } it := logical.NewItemIter(iters, i.order.Sort) defer func() { err = multierr.Append(err, it.Close()) }() for it.HasNext() { nextItem := it.Next() tagFamilies, innerErr := logical.ProjectItem(ec, nextItem, i.projectionTagRefs) if innerErr != nil { return nil, innerErr } elementID, innerErr := ec.ParseElementID(nextItem) if innerErr != nil { return nil, innerErr } elems = append(elems, &streamv1.Element{ ElementId: elementID, Timestamp: timestamppb.New(time.Unix(0, int64(nextItem.Time()))), TagFamilies: tagFamilies, }) if len(elems) > i.maxElementSize { break } } return elems, nil } func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; orderBy=%s; limit=%d", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), i.order, i.maxElementSize) } func (i *localIndexScan) Children() []logical.Plan { return []logical.Plan{} } func (i *localIndexScan) Schema() logical.Schema { if i.projectionTagRefs == nil || len(i.projectionTagRefs) == 0 { return i.schema } return i.schema.ProjTags(i.projectionTagRefs...) }