banyand/stream/query_by_idx.go (288 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "fmt" "sort" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/model" ) type idxResult struct { sortingIter itersort.Iterator[*index.DocumentResult] sm *stream pm *protector.Memory tabs []*tsTable elementIDsSorted []uint64 data []*blockCursor snapshots []*snapshot segments []storage.Segment[*tsTable, option] qo queryOptions loaded bool asc bool } func (qr *idxResult) Pull(ctx context.Context) *model.StreamResult { if !qr.loaded { qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize) return qr.loadSortingData(ctx) } if v := qr.nextValue(); v != nil { return v } qr.loaded = false return qr.loadSortingData(ctx) } func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error { var parts []*part var n int for i := range qr.tabs { s := qr.tabs[i].currentSnapshot() if s == nil { continue } parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) if n < 1 { s.decRef() continue } qr.snapshots = append(qr.snapshots, s) } bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr) defer defFn() ti := generateTstIter() defer releaseTstIter(ti) sids := qo.sortedSids ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if ti.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } var hit int var totalBlockBytes uint64 for ti.nextBlock() { if hit%checkDoneEvery == 0 { select { case <-ctx.Done(): return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) default: } } hit++ bc := generateBlockCursor() p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) qr.data = append(qr.data, bc) totalBlockBytes += bc.bm.uncompressedSizeBytes } if ti.Error() != nil { return fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) } if err := qr.pm.AcquireResource(ctx, totalBlockBytes); err != nil { return fmt.Errorf("cannot acquire resource: %w", err) } return nil } func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamResult { if qr.loaded { qr.nextValue() } if err := qr.scanParts(ctx, qo); err != nil { return &model.StreamResult{ Error: err, } } if len(qr.data) == 0 { return nil } cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { select { case <-ctx.Done(): releaseBlockCursor(qr.data[i]) cursorChan <- i return default: } if qr.sm.schema.GetEntity() == nil || len(qr.sm.schema.GetEntity().GetTagNames()) == 0 { cursorChan <- -1 return } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if loadBlockCursor(qr.data[i], tmpBlock, qo, qr.sm) { cursorChan <- -1 return } cursorChan <- i }(i) } blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { result := <-cursorChan if result != -1 { blankCursorList = append(blankCursorList, result) } } select { case <-ctx.Done(): return &model.StreamResult{ Error: errors.WithMessagef(ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), } default: } sort.Slice(blankCursorList, func(i, j int) bool { return blankCursorList[i] > blankCursorList[j] }) for _, index := range blankCursorList { qr.data = append(qr.data[:index], qr.data[index+1:]...) } qr.loaded = true return qr.nextValue() } func (qr *idxResult) nextValue() *model.StreamResult { if len(qr.data) == 0 { return nil } return qr.mergeByTagValue() } func (qr *idxResult) loadSortingData(ctx context.Context) *model.StreamResult { var qo queryOptions qo.StreamQueryOptions = qr.qo.StreamQueryOptions qo.elementFilter = roaring.NewPostingList() qo.seriesToEntity = qr.qo.seriesToEntity qr.elementIDsSorted = qr.elementIDsSorted[:0] count, searchedSize := 1, 0 tracer := query.GetTracer(ctx) if tracer != nil { span, _ := tracer.StartSpan(ctx, "load-sorting-data") span.Tagf("max_element_size", "%d", qo.MaxElementSize) if qr.qo.elementFilter != nil { span.Tag("filter_size", fmt.Sprintf("%d", qr.qo.elementFilter.Len())) } defer func() { span.Tagf("searched_size", "%d", searchedSize) span.Tagf("count", "%d", count) span.Stop() }() } for ; qr.sortingIter.Next(); count++ { searchedSize++ val := qr.sortingIter.Val() if qr.qo.elementFilter != nil && !qr.qo.elementFilter.Contains(val.DocID) { count-- continue } qo.elementFilter.Insert(val.DocID) if val.Timestamp > qo.maxTimestamp { qo.maxTimestamp = val.Timestamp } if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 { qo.minTimestamp = val.Timestamp } qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID) // Insertion sort insertPos, found := -1, false for i, sid := range qo.sortedSids { if val.SeriesID == sid { found = true break } if val.SeriesID < sid { insertPos = i break } } if !found { if insertPos == -1 { qo.sortedSids = append(qo.sortedSids, val.SeriesID) } else { qo.sortedSids = append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, qo.sortedSids[insertPos:]...)...) } } if count >= qo.MaxElementSize { break } } if qo.elementFilter.IsEmpty() { return nil } return qr.load(ctx, qo) } func (qr *idxResult) releaseParts() { qr.releaseBlockCursor() for i := range qr.snapshots { qr.snapshots[i].decRef() } qr.snapshots = qr.snapshots[:0] } func (qr *idxResult) releaseBlockCursor() { for i, v := range qr.data { releaseBlockCursor(v) qr.data[i] = nil } qr.data = qr.data[:0] } func (qr *idxResult) Release() { qr.releaseParts() for i := range qr.segments { qr.segments[i].DecRef() } } func (qr *idxResult) mergeByTagValue() *model.StreamResult { defer qr.releaseBlockCursor() tmp := &model.StreamResult{} prevIdx := 0 elementIDToIdx := make(map[uint64]int) for _, data := range qr.data { data.copyAllTo(tmp, false) var idx int for idx = prevIdx; idx < len(tmp.Timestamps); idx++ { elementIDToIdx[tmp.ElementIDs[idx]] = idx } prevIdx = idx } r := &model.StreamResult{ TagFamilies: []model.TagFamily{}, } for _, tagFamily := range tmp.TagFamilies { tf := model.TagFamily{ Name: tagFamily.Name, Tags: []model.Tag{}, } for _, tag := range tagFamily.Tags { t := model.Tag{ Name: tag.Name, Values: []*modelv1.TagValue{}, } tf.Tags = append(tf.Tags, t) } r.TagFamilies = append(r.TagFamilies, tf) } for _, id := range qr.elementIDsSorted { idx, ok := elementIDToIdx[id] if !ok { continue } r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx]) r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx]) for i := 0; i < len(r.TagFamilies); i++ { for j := 0; j < len(r.TagFamilies[i].Tags); j++ { r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx]) } } } return r } var bypassQueryResultInstance = &bypassQueryResult{} type bypassQueryResult struct{} func (bypassQueryResult) Pull(context.Context) *model.StreamResult { return nil } func (bypassQueryResult) Release() {}