banyand/stream/block_scanner.go (261 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "fmt" "sort" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query" logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" ) const blockScannerBatchSize = 32 type blockScanResult struct { p *part qo queryOptions bm blockMetadata } func (bs *blockScanResult) reset() { bs.p = nil bs.qo.reset() bs.bm.reset() } type blockScanResultBatch struct { err error bss []blockScanResult } func (bsb *blockScanResultBatch) reset() { bsb.err = nil for i := range bsb.bss { bsb.bss[i].reset() } bsb.bss = bsb.bss[:0] } func generateBlockScanResultBatch() *blockScanResultBatch { v := blockScanResultBatchPool.Get() if v == nil { return &blockScanResultBatch{ bss: make([]blockScanResult, 0, blockScannerBatchSize), } } return v } func releaseBlockScanResultBatch(bsb *blockScanResultBatch) { bsb.reset() blockScanResultBatchPool.Put(bsb) } var blockScanResultBatchPool = pool.Register[*blockScanResultBatch]("stream-blockScannerBatch") func searchSeries(ctx context.Context, qo queryOptions, segment storage.Segment[*tsTable, *option], series []*pbv1.Series) (queryOptions, error) { seriesFilter := roaring.NewPostingList() sl, err := segment.Lookup(ctx, series) if err != nil { return qo, err } for i := range sl { if seriesFilter.Contains(uint64(sl[i].ID)) { continue } seriesFilter.Insert(uint64(sl[i].ID)) if qo.seriesToEntity == nil { qo.seriesToEntity = make(map[common.SeriesID][]*modelv1.TagValue) } qo.seriesToEntity[sl[i].ID] = sl[i].EntityValues qo.sortedSids = append(qo.sortedSids, sl[i].ID) } if seriesFilter.IsEmpty() { return qo, nil } sort.Slice(qo.sortedSids, func(i, j int) bool { return qo.sortedSids[i] < qo.sortedSids[j] }) return qo, nil } func getBlockScanner(ctx context.Context, segment storage.Segment[*tsTable, *option], qo queryOptions, l *logger.Logger, pm *protector.Memory, tr *index.RangeOpts, ) (bc *blockScanner, err error) { tabs := segment.Tables() finalizers := make([]scanFinalizer, 0, len(tabs)+1) finalizers = append(finalizers, segment.DecRef) defer func() { if bc == nil || err != nil { for i := range finalizers { finalizers[i]() } } }() var parts []*part var size, offset int filterIndex := make(map[uint64]posting.List) for i := range tabs { filter, filterTS, err := search(ctx, qo, qo.sortedSids, tabs[i], tr) if err != nil { return nil, err } if filter != nil && filter.IsEmpty() { continue } minTimestamp, maxTimestamp := updateTimeRange(filterTS, qo.minTimestamp, qo.maxTimestamp) snp := tabs[i].currentSnapshot() parts, size = snp.getParts(parts, minTimestamp, maxTimestamp) if size < 1 { snp.decRef() continue } finalizers = append(finalizers, snp.decRef) for j := offset; j < offset+size; j++ { filterIndex[parts[j].partMetadata.ID] = filter } offset += size } if len(parts) < 1 { return nil, nil } var asc bool if qo.Order == nil { asc = true } else { asc = qo.Order.Sort == modelv1.Sort_SORT_ASC || qo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED } return &blockScanner{ parts: getDisjointParts(parts, asc), filterIndex: filterIndex, qo: qo, asc: asc, l: l, pm: pm, finalizers: finalizers, }, nil } func search(ctx context.Context, qo queryOptions, seriesList []common.SeriesID, tw *tsTable, tr *index.RangeOpts) (pl posting.List, plTS posting.List, err error) { if qo.Filter == nil || qo.Filter == logicalstream.ENode { return nil, nil, nil } tracer := query.GetTracer(ctx) if tracer != nil { span, _ := tracer.StartSpan(ctx, "scan local index") span.Tagf("sids", "%d", len(seriesList)) span.Tag("tab", tw.p.String()) defer func() { if pl != nil { span.Tagf("got", "%d", pl.Len()) } if err != nil { span.Error(err) } span.Stop() }() } sid := make([]uint64, len(seriesList)) for i := range seriesList { sid[i] = uint64(seriesList[i]) } pl, plTS, err = tw.Index().Search(ctx, sid, qo.Filter, tr) if err != nil { return nil, nil, err } if pl == nil { return roaring.DummyPostingList, roaring.DummyPostingList, nil } return pl, plTS, nil } type scanFinalizer func() type blockScanner struct { filterIndex map[uint64]posting.List l *logger.Logger pm *protector.Memory parts [][]*part finalizers []scanFinalizer qo queryOptions asc bool } func (bsn *blockScanner) scan(ctx context.Context, blockCh chan *blockScanResultBatch) { if len(bsn.parts) < 1 { return } var parts []*part if bsn.asc { parts = bsn.parts[0] bsn.parts = bsn.parts[1:] } else { parts = bsn.parts[len(bsn.parts)-1] bsn.parts = bsn.parts[:len(bsn.parts)-1] } bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) ti := generateTstIter() defer releaseTstIter(ti) ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, bsn.qo.maxTimestamp) batch := generateBlockScanResultBatch() if ti.Error() != nil { batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error()) select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) bsn.l.Warn().Err(ti.Error()).Msg("cannot init tstIter") } return } for ti.nextBlock() { p := ti.piHeap[0] batch.bss = append(batch.bss, blockScanResult{ p: p.p, }) bs := &batch.bss[len(batch.bss)-1] bs.qo.copyFrom(&bsn.qo) bs.qo.elementFilter = bsn.filterIndex[p.p.partMetadata.ID] bs.bm.copyFrom(p.curBlock) if len(batch.bss) >= cap(batch.bss) { var totalBlockBytes uint64 for i := range batch.bss { totalBlockBytes += batch.bss[i].bm.uncompressedSizeBytes } if err := bsn.pm.AcquireResource(ctx, totalBlockBytes); err != nil { batch.err = fmt.Errorf("cannot acquire resource: %w", err) select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) bsn.l.Warn().Err(err).Msg("cannot acquire resource") } return } select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) bsn.l.Warn().Int("batch.len", len(batch.bss)).Msg("context canceled while sending block") return } batch = generateBlockScanResultBatch() } } if ti.Error() != nil { batch.err = fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) } return } if len(batch.bss) > 0 { select { case blockCh <- batch: case <-ctx.Done(): releaseBlockScanResultBatch(batch) } return } releaseBlockScanResultBatch(batch) } func (bsn *blockScanner) close() { for i := range bsn.finalizers { bsn.finalizers[i]() } }