banyand/stream/query_by_ts.go (260 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "container/heap" "context" "sync" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" ) var _ model.StreamQueryResult = (*tsResult)(nil) type tsResult struct { sm *stream pm *protector.Memory l *logger.Logger ts *blockScanner tr *index.RangeOpts segments []storage.Segment[*tsTable, option] series []*pbv1.Series shards []*model.StreamResult qo queryOptions asc bool } func (t *tsResult) Pull(ctx context.Context) (r *model.StreamResult) { if len(t.segments) == 0 && t.ts == nil { return nil } var err error if r, err = t.scan(ctx); err != nil { return &model.StreamResult{Error: err} } return r } func (t *tsResult) scan(ctx context.Context) (*model.StreamResult, error) { if t.ts != nil { return t.runTabScanner(ctx) } for len(t.segments) > 0 { var segment storage.Segment[*tsTable, option] if t.asc { segment = t.segments[len(t.segments)-1] t.segments = t.segments[:len(t.segments)-1] } else { segment = t.segments[0] t.segments = t.segments[1:] } qo, err := searchSeries(ctx, t.qo, segment, t.series) if err != nil { return nil, err } ts, err := getBlockScanner(ctx, segment, qo, t.l, t.pm, t.tr) if err != nil { return nil, err } if ts == nil { continue } t.ts = ts return t.runTabScanner(ctx) } return nil, nil } func (t *tsResult) runTabScanner(ctx context.Context) (*model.StreamResult, error) { workerSize := cgroups.CPUs() var workerWg sync.WaitGroup batchCh := make(chan *blockScanResultBatch, workerSize) workerWg.Add(workerSize) if t.shards == nil { t.shards = make([]*model.StreamResult, workerSize) for i := range t.shards { t.shards[i] = model.NewStreamResult(t.qo.MaxElementSize, t.asc) } } else { for i := range t.shards { t.shards[i].Reset() } } for i := range workerSize { go func(workerID int) { tmpBlock := generateBlock() defer releaseBlock(tmpBlock) blockHeap := generateBlockCursorHeap(t.asc) defer releaseBlockCursorHeap(blockHeap) tmpResult := model.NewStreamResult(t.qo.MaxElementSize, t.asc) for batch := range batchCh { if batch.err != nil { t.shards[workerID].Error = batch.err releaseBlockScanResultBatch(batch) continue } for _, bs := range batch.bss { bc := generateBlockCursor() bc.init(bs.p, &bs.bm, bs.qo) if loadBlockCursor(bc, tmpBlock, bs.qo, t.sm) { if !t.asc { bc.idx = len(bc.timestamps) - 1 } blockHeap.Push(bc) } } heap.Init(blockHeap) result := blockHeap.merge(t.qo.MaxElementSize) t.shards[workerID].CopyFrom(tmpResult, result) blockHeap.reset() releaseBlockScanResultBatch(batch) } workerWg.Done() }(i) } t.ts.scan(ctx, batchCh) close(batchCh) workerWg.Wait() if len(t.ts.parts) == 0 { t.ts.close() t.ts = nil } var err error for i := range t.shards { if t.shards[i].Error != nil { err = multierr.Append(err, t.shards[i].Error) } } if err != nil { return nil, err } return model.MergeStreamResults(t.shards, t.qo.MaxElementSize, t.asc), nil } func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm *stream) bool { tmpBlock.reset() if !bc.loadData(tmpBlock) { releaseBlockCursor(bc) return false } entityValues := qo.seriesToEntity[bc.bm.seriesID] tagFamilyMap := make(map[string]int) for idx, tagFamily := range bc.tagFamilies { tagFamilyMap[tagFamily.name] = idx + 1 } is := sm.indexSchema.Load().(indexSchema) for _, tagFamilyProj := range bc.tagProjection { for j, tagProj := range tagFamilyProj.Names { tagSpec := is.tagMap[tagProj] if tagSpec.IndexedOnly { continue } entityPos := is.indexRuleLocators.EntitySet[tagProj] if entityPos == 0 { continue } tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] if tagFamilyPos == 0 { bc.tagFamilies[tagFamilyPos-1] = tagFamily{ name: tagFamilyProj.Family, tags: make([]tag, 0), } } valueType := pbv1.MustTagValueToValueType(entityValues[entityPos-1]) bc.tagFamilies[tagFamilyPos-1].tags[j] = tag{ name: tagProj, values: mustEncodeTagValue(tagProj, tagSpec.GetType(), entityValues[entityPos-1], len(bc.timestamps)), valueType: valueType, } } } return true } func (t *tsResult) Release() { if t.ts != nil { t.ts.close() } for i := range t.segments { t.segments[i].DecRef() } } type blockCursorHeap struct { bcc []*blockCursor asc bool } func (bch blockCursorHeap) Len() int { return len(bch.bcc) } func (bch blockCursorHeap) Less(i, j int) bool { leftIdx, rightIdx := bch.bcc[i].idx, bch.bcc[j].idx leftTS := bch.bcc[i].timestamps[leftIdx] rightTS := bch.bcc[j].timestamps[rightIdx] if bch.asc { return leftTS < rightTS } return leftTS > rightTS } func (bch *blockCursorHeap) Swap(i, j int) { bch.bcc[i], bch.bcc[j] = bch.bcc[j], bch.bcc[i] } func (bch *blockCursorHeap) Push(x interface{}) { bch.bcc = append(bch.bcc, x.(*blockCursor)) } func (bch *blockCursorHeap) Pop() interface{} { old := bch.bcc n := len(old) x := old[n-1] bch.bcc = old[0 : n-1] releaseBlockCursor(x) return x } func (bch *blockCursorHeap) reset() { for i := range bch.bcc { releaseBlockCursor(bch.bcc[i]) } bch.bcc = bch.bcc[:0] } func (bch *blockCursorHeap) merge(limit int) *model.StreamResult { step := -1 if bch.asc { step = 1 } result := &model.StreamResult{} for bch.Len() > 0 { topBC := bch.bcc[0] topBC.copyTo(result) if result.Len() >= limit { break } topBC.idx += step if bch.asc { if topBC.idx >= len(topBC.timestamps) { heap.Pop(bch) } else { heap.Fix(bch, 0) } } else { if topBC.idx < 0 { heap.Pop(bch) } else { heap.Fix(bch, 0) } } } return result } var blockCursorHeapPool = pool.Register[*blockCursorHeap]("stream-blockCursorHeap") func generateBlockCursorHeap(asc bool) *blockCursorHeap { v := blockCursorHeapPool.Get() if v == nil { return &blockCursorHeap{ asc: asc, bcc: make([]*blockCursor, 0, blockScannerBatchSize), } } v.asc = asc return v } func releaseBlockCursorHeap(bch *blockCursorHeap) { bch.reset() blockCursorHeapPool.Put(bch) }