banyand/measure/query.go (796 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "bytes" "container/heap" "context" "fmt" "sort" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const ( preloadSize = 100 checkDoneEvery = 128 ) var nilResult = model.MeasureQueryResult(nil) // Query allow to retrieve measure data points. type Query interface { LoadGroup(name string) (resourceSchema.Group, bool) Measure(measure *commonv1.Metadata) (Measure, error) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRange } // Measure allows inspecting measure data points' details. type Measure interface { Query(ctx context.Context, opts model.MeasureQueryOptions) (model.MeasureQueryResult, error) GetSchema() *databasev1.Measure GetIndexRules() []*databasev1.IndexRule } var _ Measure = (*measure)(nil) type queryOptions struct { model.MeasureQueryOptions minTimestamp int64 maxTimestamp int64 } func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr model.MeasureQueryResult, err error) { if mqo.TimeRange == nil { return nil, errors.New("invalid query options: timeRange are required") } if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 { return nil, errors.New("invalid query options: tagProjection or fieldProjection is required") } var tsdb storage.TSDB[*tsTable, option] db := s.tsdb.Load() if db == nil { tsdb, err = s.schemaRepo.loadTSDB(s.group) if err != nil { return nil, err } s.tsdb.Store(tsdb) } else { tsdb = db.(storage.TSDB[*tsTable, option]) } segments, err := tsdb.SelectSegments(*mqo.TimeRange) if err != nil { return nil, err } if len(segments) < 1 { return nilResult, nil } if s.schema.IndexMode { return s.buildIndexQueryResult(ctx, mqo, segments) } if len(mqo.Entities) < 1 { return nil, errors.New("invalid query options: series is required") } series := make([]*pbv1.Series, len(mqo.Entities)) for i := range mqo.Entities { series[i] = &pbv1.Series{ Subject: mqo.Name, EntityValues: mqo.Entities[i], } } sids, tables, storedIndexValue, newTagProjection, err := s.searchSeriesList(ctx, series, mqo, segments) if err != nil { return nil, err } if len(sids) < 1 { for i := range segments { segments[i].DecRef() } return nilResult, nil } result := queryResult{ ctx: ctx, segments: segments, tagProjection: mqo.TagProjection, storedIndexValue: storedIndexValue, } defer func() { if err != nil { result.Release() } }() mqo.TagProjection = newTagProjection var parts []*part qo := queryOptions{ MeasureQueryOptions: mqo, minTimestamp: mqo.TimeRange.Start.UnixNano(), maxTimestamp: mqo.TimeRange.End.UnixNano(), } var n int for i := range tables { s := tables[i].currentSnapshot() if s == nil { continue } parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) if n < 1 { s.decRef() continue } result.snapshots = append(result.snapshots, s) } if err = s.searchBlocks(ctx, &result, sids, parts, qo); err != nil { return nil, err } if mqo.Order == nil { result.ascTS = true result.orderByTS = true } else { if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { result.ascTS = true } switch mqo.Order.Type { case index.OrderByTypeTime: result.orderByTS = true case index.OrderByTypeIndex: result.orderByTS = false case index.OrderByTypeSeries: result.orderByTS = false } } return &result, nil } type tagNameWithType struct { fieldName string typ pbv1.ValueType } func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], ) (sl []common.SeriesID, tables []*tsTable, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, newTagProjection []model.TagProjection, err error, ) { var indexProjection []index.FieldKey fieldToValueType := make(map[string]tagNameWithType) var projectedEntityOffsets map[string]int newTagProjection = make([]model.TagProjection, 0) is := s.indexSchema.Load().(indexSchema) for _, tp := range mqo.TagProjection { var tagProjection model.TagProjection TAG: for _, n := range tp.Names { for i := range s.schema.GetEntity().GetTagNames() { if n == s.schema.GetEntity().GetTagNames()[i] { if projectedEntityOffsets == nil { projectedEntityOffsets = make(map[string]int) } projectedEntityOffsets[n] = i continue TAG } } if is.fieldIndexLocation != nil { if fields, ok := is.fieldIndexLocation[tp.Family]; ok { if field, ok := fields[n]; ok { indexProjection = append(indexProjection, field.Key) fieldToValueType[field.Key.Marshal()] = tagNameWithType{ fieldName: n, typ: field.Type, } continue TAG } } } tagProjection.Family = tp.Family tagProjection.Names = append(tagProjection.Names, n) } if tagProjection.Family != "" { newTagProjection = append(newTagProjection, tagProjection) } } seriesFilter := roaring.NewPostingList() for i := range segments { sd, _, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ Query: mqo.Query, Order: mqo.Order, PreloadSize: preloadSize, Projection: indexProjection, }) if err != nil { return nil, nil, nil, nil, err } if len(sd.SeriesList) > 0 { tables = append(tables, segments[i].Tables()...) } for j := range sd.SeriesList { if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) { continue } seriesFilter.Insert(uint64(sd.SeriesList[j].ID)) sl = append(sl, sd.SeriesList[j].ID) if projectedEntityOffsets == nil && sd.Fields == nil { continue } if storedIndexValue == nil { storedIndexValue = make(map[common.SeriesID]map[string]*modelv1.TagValue) } tagValues := make(map[string]*modelv1.TagValue) storedIndexValue[sd.SeriesList[j].ID] = tagValues for name, offset := range projectedEntityOffsets { if offset < 0 || offset >= len(sd.SeriesList[j].EntityValues) { logger.Warningf("offset %d for tag %s is out of range for series ID %v", offset, name, sd.SeriesList[j].ID) tagValues[name] = pbv1.NullTagValue continue } tagValues[name] = sd.SeriesList[j].EntityValues[offset] } if sd.Fields == nil { continue } for f, v := range sd.Fields[j] { if tnt, ok := fieldToValueType[f]; ok { tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, v) } else { logger.Panicf("unknown field %s not found in fieldToValueType", f) } } } } return sl, tables, storedIndexValue, newTagProjection, nil } func (s *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], ) (model.MeasureQueryResult, error) { defer func() { for i := range segments { segments[i].DecRef() } }() is := s.indexSchema.Load().(indexSchema) r := &indexSortResult{} var indexProjection []index.FieldKey for _, tp := range mqo.TagProjection { tagFamilyLocation := tagFamilyLocation{ name: tp.Family, fieldToValueType: make(map[string]tagNameWithType), projectedEntityOffsets: make(map[string]int), } TAG: for _, n := range tp.Names { tagFamilyLocation.tagNames = append(tagFamilyLocation.tagNames, n) for i := range s.schema.GetEntity().GetTagNames() { if n == s.schema.GetEntity().GetTagNames()[i] { tagFamilyLocation.projectedEntityOffsets[n] = i continue TAG } } if is.fieldIndexLocation != nil { if fields, ok := is.fieldIndexLocation[tp.Family]; ok { if field, ok := fields[n]; ok { indexProjection = append(indexProjection, field.Key) tagFamilyLocation.fieldToValueType[n] = tagNameWithType{ fieldName: field.Key.Marshal(), typ: field.Type, } continue TAG } } } return nil, fmt.Errorf("tag %s not found in schema", n) } r.tfl = append(r.tfl, tagFamilyLocation) } var err error opts := storage.IndexSearchOpts{ Query: mqo.Query, Order: mqo.Order, PreloadSize: preloadSize, Projection: indexProjection, } seriesFilter := roaring.NewPostingList() for i := range segments { if mqo.TimeRange.Include(segments[i].GetTimeRange()) { opts.TimeRange = nil } else { opts.TimeRange = mqo.TimeRange } sr := &segResult{} sr.SeriesData, sr.sortedValues, err = segments[i].IndexDB().SearchWithoutSeries(ctx, opts) if err != nil { return nil, err } for j := 0; j < len(sr.SeriesList); j++ { if seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) { sr.remove(j) j-- continue } seriesFilter.Insert(uint64(sr.SeriesList[j].ID)) } if len(sr.SeriesList) < 1 { continue } r.segResults = append(r.segResults, sr) } if len(r.segResults) < 1 { return nilResult, nil } heap.Init(&r.segResults) return r, nil } func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids []common.SeriesID, parts []*part, qo queryOptions) error { bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(sids), parts, result) defer defFn() tstIter := generateTstIter() defer releaseTstIter(tstIter) originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if tstIter.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } var hit int var totalBlockBytes uint64 for tstIter.nextBlock() { if hit%checkDoneEvery == 0 { select { case <-ctx.Done(): return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) default: } } hit++ bc := generateBlockCursor() p := tstIter.piHeap[0] bc.init(p.p, p.curBlock, qo) result.data = append(result.data, bc) totalBlockBytes += bc.bm.uncompressedSizeBytes } if tstIter.Error() != nil { return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) } if err := s.pm.AcquireResource(ctx, totalBlockBytes); err != nil { return err } result.sidToIndex = make(map[common.SeriesID]int) for i, si := range originalSids { result.sidToIndex[si] = i } return nil } func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { if value == nil { return pbv1.NullTagValue } switch valueType { case pbv1.ValueTypeInt64: return int64TagValue(convert.BytesToInt64(value)) case pbv1.ValueTypeStr: return strTagValue(string(value)) case pbv1.ValueTypeBinaryData: return binaryDataTagValue(value) case pbv1.ValueTypeInt64Arr: var values []int64 for i := 0; i < len(value); i += 8 { values = append(values, convert.BytesToInt64(value[i:i+8])) } return int64ArrTagValue(values) case pbv1.ValueTypeStrArr: var values []string bb := bigValuePool.Generate() var err error for len(value) > 0 { bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], value) if err != nil { logger.Panicf("unmarshalVarArray failed: %v", err) } values = append(values, string(bb.Buf)) } return strArrTagValue(values) default: logger.Panicf("unsupported value type: %v", valueType) return nil } } func int64TagValue(value int64) *modelv1.TagValue { return &modelv1.TagValue{ Value: &modelv1.TagValue_Int{ Int: &modelv1.Int{ Value: value, }, }, } } func strTagValue(value string) *modelv1.TagValue { return &modelv1.TagValue{ Value: &modelv1.TagValue_Str{ Str: &modelv1.Str{ Value: value, }, }, } } func binaryDataTagValue(value []byte) *modelv1.TagValue { data := make([]byte, len(value)) copy(data, value) return &modelv1.TagValue{ Value: &modelv1.TagValue_BinaryData{ BinaryData: data, }, } } func int64ArrTagValue(values []int64) *modelv1.TagValue { return &modelv1.TagValue{ Value: &modelv1.TagValue_IntArray{ IntArray: &modelv1.IntArray{ Value: values, }, }, } } func strArrTagValue(values []string) *modelv1.TagValue { return &modelv1.TagValue{ Value: &modelv1.TagValue_StrArray{ StrArray: &modelv1.StrArray{ Value: values, }, }, } } func mustDecodeFieldValue(valueType pbv1.ValueType, value []byte) *modelv1.FieldValue { if value == nil { switch valueType { case pbv1.ValueTypeStr: return pbv1.EmptyStrFieldValue case pbv1.ValueTypeBinaryData: return pbv1.EmptyBinaryFieldValue default: return pbv1.NullFieldValue } } switch valueType { case pbv1.ValueTypeInt64: return int64FieldValue(convert.BytesToInt64(value)) case pbv1.ValueTypeFloat64: return float64FieldValue(convert.BytesToFloat64(value)) case pbv1.ValueTypeStr: return strFieldValue(string(value)) case pbv1.ValueTypeBinaryData: return binaryDataFieldValue(value) default: logger.Panicf("unsupported value type: %v", valueType) return nil } } func int64FieldValue(value int64) *modelv1.FieldValue { return &modelv1.FieldValue{ Value: &modelv1.FieldValue_Int{ Int: &modelv1.Int{ Value: value, }, }, } } func float64FieldValue(value float64) *modelv1.FieldValue { return &modelv1.FieldValue{ Value: &modelv1.FieldValue_Float{ Float: &modelv1.Float{ Value: value, }, }, } } func strFieldValue(value string) *modelv1.FieldValue { return &modelv1.FieldValue{ Value: &modelv1.FieldValue_Str{ Str: &modelv1.Str{ Value: value, }, }, } } func binaryDataFieldValue(value []byte) *modelv1.FieldValue { data := make([]byte, len(value)) copy(data, value) return &modelv1.FieldValue{ Value: &modelv1.FieldValue_BinaryData{ BinaryData: data, }, } } type queryResult struct { ctx context.Context sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue tagProjection []model.TagProjection data []*blockCursor snapshots []*snapshot segments []storage.Segment[*tsTable, option] hit int loaded bool orderByTS bool ascTS bool } func (qr *queryResult) Pull() *model.MeasureResult { select { case <-qr.ctx.Done(): return &model.MeasureResult{ Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: hit %d", qr.hit), } default: } if !qr.loaded { if len(qr.data) == 0 { return nil } cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { select { case <-qr.ctx.Done(): cursorChan <- i return default: } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if !qr.data[i].loadData(tmpBlock) { cursorChan <- i return } if qr.orderByTimestampDesc() { qr.data[i].idx = len(qr.data[i].timestamps) - 1 } cursorChan <- -1 }(i) } blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { result := <-cursorChan if result != -1 { blankCursorList = append(blankCursorList, result) } } select { case <-qr.ctx.Done(): return &model.MeasureResult{ Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), } default: } sort.Slice(blankCursorList, func(i, j int) bool { return blankCursorList[i] > blankCursorList[j] }) for _, index := range blankCursorList { qr.data = append(qr.data[:index], qr.data[index+1:]...) } qr.loaded = true heap.Init(qr) } if len(qr.data) == 0 { return nil } if len(qr.data) == 1 { r := &model.MeasureResult{} bc := qr.data[0] bc.copyAllTo(r, qr.storedIndexValue, qr.tagProjection, qr.orderByTimestampDesc()) qr.data = qr.data[:0] releaseBlockCursor(bc) return r } return qr.merge(qr.storedIndexValue, qr.tagProjection) } func (qr *queryResult) Release() { for i, v := range qr.data { releaseBlockCursor(v) qr.data[i] = nil } qr.data = qr.data[:0] for i := range qr.snapshots { qr.snapshots[i].decRef() } qr.snapshots = qr.snapshots[:0] for i := range qr.segments { qr.segments[i].DecRef() } } func (qr queryResult) Len() int { return len(qr.data) } func (qr queryResult) Less(i, j int) bool { leftTS := qr.data[i].timestamps[qr.data[i].idx] rightTS := qr.data[j].timestamps[qr.data[j].idx] leftVersion := qr.data[i].versions[qr.data[i].idx] rightVersion := qr.data[j].versions[qr.data[j].idx] if qr.orderByTS { if leftTS == rightTS { if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID { // sort version in descending order if timestamps and seriesID are equal return leftVersion > rightVersion } // sort seriesID in ascending order if timestamps are equal return qr.data[i].bm.seriesID < qr.data[j].bm.seriesID } if qr.ascTS { return leftTS < rightTS } return leftTS > rightTS } leftSIDIndex := qr.sidToIndex[qr.data[i].bm.seriesID] rightSIDIndex := qr.sidToIndex[qr.data[j].bm.seriesID] if leftSIDIndex == rightSIDIndex { if leftTS == rightTS { // sort version in descending order if timestamps and seriesID are equal return leftVersion > rightVersion } // sort timestamps in ascending order if seriesID are equal return leftTS < rightTS } return leftSIDIndex < rightSIDIndex } func (qr queryResult) Swap(i, j int) { qr.data[i], qr.data[j] = qr.data[j], qr.data[i] } func (qr *queryResult) Push(x interface{}) { qr.data = append(qr.data, x.(*blockCursor)) } func (qr *queryResult) Pop() interface{} { old := qr.data n := len(old) x := old[n-1] qr.data = old[0 : n-1] releaseBlockCursor(x) return x } func (qr *queryResult) orderByTimestampDesc() bool { return qr.orderByTS && !qr.ascTS } func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, ) *model.MeasureResult { step := 1 if qr.orderByTimestampDesc() { step = -1 } result := &model.MeasureResult{} var lastVersion int64 var lastSid common.SeriesID for qr.Len() > 0 { topBC := qr.data[0] if lastSid != 0 && topBC.bm.seriesID != lastSid { return result } lastSid = topBC.bm.seriesID if len(result.Timestamps) > 0 && topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] { if topBC.versions[topBC.idx] > lastVersion { topBC.replace(result, storedIndexValue) } } else { topBC.copyTo(result, storedIndexValue, tagProjection) lastVersion = topBC.versions[topBC.idx] } topBC.idx += step if qr.orderByTimestampDesc() { if topBC.idx < 0 { heap.Pop(qr) } else { heap.Fix(qr, 0) } } else { if topBC.idx >= len(topBC.timestamps) { heap.Pop(qr) } else { heap.Fix(qr, 0) } } } return result } type indexSortResult struct { tfl []tagFamilyLocation segResults segResultHeap } // Pull implements model.MeasureQueryResult. func (iqr *indexSortResult) Pull() *model.MeasureResult { if len(iqr.segResults) < 1 { return nil } if len(iqr.segResults) == 1 { if iqr.segResults[0].i >= len(iqr.segResults[0].SeriesList) { return nil } sr := iqr.segResults[0] r := iqr.copyTo(sr) sr.i++ if sr.i >= len(sr.SeriesList) { iqr.segResults = iqr.segResults[:0] } return r } top := heap.Pop(&iqr.segResults) sr := top.(*segResult) r := iqr.copyTo(sr) sr.i++ if sr.i < len(sr.SeriesList) { heap.Push(&iqr.segResults, sr) } return r } func (iqr *indexSortResult) Release() {} func (iqr *indexSortResult) copyTo(src *segResult) (dest *model.MeasureResult) { index := src.i dest = &model.MeasureResult{ SID: src.SeriesList[index].ID, Timestamps: []int64{src.Timestamps[index]}, Versions: []int64{src.Versions[index]}, } for i := range iqr.tfl { tagFamily := model.TagFamily{Name: iqr.tfl[i].name} peo := iqr.tfl[i].projectedEntityOffsets var fr storage.FieldResult if src.Fields != nil { fr = src.Fields[index] } for _, n := range iqr.tfl[i].tagNames { if offset, ok := peo[n]; ok { tagFamily.Tags = append(tagFamily.Tags, model.Tag{ Name: n, Values: []*modelv1.TagValue{src.SeriesList[index].EntityValues[offset]}, }) continue } if fr == nil { continue } if tnt, ok := iqr.tfl[i].fieldToValueType[n]; ok { tagFamily.Tags = append(tagFamily.Tags, model.Tag{ Name: n, Values: []*modelv1.TagValue{mustDecodeTagValue(tnt.typ, fr[tnt.fieldName])}, }) } else { logger.Panicf("unknown field %s not found in fieldToValueType", n) } } dest.TagFamilies = append(dest.TagFamilies, tagFamily) } return dest } type tagFamilyLocation struct { fieldToValueType map[string]tagNameWithType projectedEntityOffsets map[string]int name string tagNames []string } type segResult struct { storage.SeriesData sortedValues [][]byte i int } func (sr *segResult) remove(i int) { sr.SeriesList = append(sr.SeriesList[:i], sr.SeriesList[i+1:]...) if sr.Fields != nil { sr.Fields = append(sr.Fields[:i], sr.Fields[i+1:]...) } sr.Timestamps = append(sr.Timestamps[:i], sr.Timestamps[i+1:]...) sr.Versions = append(sr.Versions[:i], sr.Versions[i+1:]...) if sr.sortedValues != nil { sr.sortedValues = append(sr.sortedValues[:i], sr.sortedValues[i+1:]...) } } type segResultHeap []*segResult func (h segResultHeap) Len() int { return len(h) } func (h segResultHeap) Less(i, j int) bool { if h[i].sortedValues == nil { return h[i].SeriesList[h[i].i].ID < h[j].SeriesList[h[j].i].ID } return bytes.Compare(h[i].sortedValues[h[i].i], h[j].sortedValues[h[j].i]) < 0 } func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *segResultHeap) Push(x interface{}) { *h = append(*h, x.(*segResult)) } func (h *segResultHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x }