banyand/stream/query.go (388 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const checkDoneEvery = 128
func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) {
if err = validateQueryInput(sqo); err != nil {
return nil, err
}
tsdb, err := s.getTSDB()
if err != nil {
return nil, err
}
segments, err := tsdb.SelectSegments(*sqo.TimeRange)
if err != nil {
return nil, err
}
if len(segments) < 1 {
return bypassQueryResultInstance, nil
}
defer func() {
if err != nil {
sqr.Release()
}
}()
series := prepareSeriesData(sqo)
qo := prepareQueryOptions(sqo)
tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, true)
if sqo.Order == nil || sqo.Order.Index == nil {
return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
}
return s.executeIndexedQuery(ctx, segments, series, sqo, &tr)
}
func validateQueryInput(sqo model.StreamQueryOptions) error {
if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return errors.New("invalid query options: timeRange and series are required")
}
if len(sqo.TagProjection) == 0 {
return errors.New("invalid query options: tagProjection is required")
}
return nil
}
func (s *stream) getTSDB() (storage.TSDB[*tsTable, option], error) {
var tsdb storage.TSDB[*tsTable, option]
db := s.tsdb.Load()
if db == nil {
var err error
tsdb, err = s.schemaRepo.loadTSDB(s.group)
if err != nil {
return nil, err
}
s.tsdb.Store(tsdb)
} else {
tsdb = db.(storage.TSDB[*tsTable, option])
}
return tsdb, nil
}
func prepareSeriesData(sqo model.StreamQueryOptions) []*pbv1.Series {
series := make([]*pbv1.Series, len(sqo.Entities))
for i := range sqo.Entities {
series[i] = &pbv1.Series{
Subject: sqo.Name,
EntityValues: sqo.Entities[i],
}
}
return series
}
func prepareQueryOptions(sqo model.StreamQueryOptions) queryOptions {
return queryOptions{
StreamQueryOptions: sqo,
minTimestamp: sqo.TimeRange.Start.UnixNano(),
maxTimestamp: sqo.TimeRange.End.UnixNano(),
}
}
func (s *stream) executeTimeSeriesQuery(
segments []storage.Segment[*tsTable, option],
series []*pbv1.Series,
qo queryOptions,
tr *index.RangeOpts,
) model.StreamQueryResult {
result := &tsResult{
segments: segments,
series: series,
qo: qo,
sm: s,
pm: s.pm,
l: s.l,
tr: tr,
}
// Determine ascending order
if qo.Order == nil {
result.asc = true
} else if qo.Order.Sort == modelv1.Sort_SORT_ASC || qo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED {
result.asc = true
}
return result
}
func (s *stream) executeIndexedQuery(
ctx context.Context,
segments []storage.Segment[*tsTable, option],
series []*pbv1.Series,
sqo model.StreamQueryOptions,
tr *index.RangeOpts,
) (model.StreamQueryResult, error) {
result, seriesFilter, resultTS, err := s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, tr)
if err != nil {
return nil, err
}
if seriesFilter.IsEmpty() {
result.Release()
return nil, nil
}
// Update time range if needed
sids := seriesFilter.ToSlice()
startTS := sqo.TimeRange.Start.UnixNano()
endTS := sqo.TimeRange.End.UnixNano()
minTS, maxTS := updateTimeRange(resultTS, startTS, endTS)
if minTS > startTS || maxTS < endTS {
newTR := timestamp.NewTimeRange(time.Unix(0, minTS), time.Unix(0, maxTS), sqo.TimeRange.IncludeStart, sqo.TimeRange.IncludeEnd)
sqo.TimeRange = &newTR
}
// Perform index-based sorting
if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); err != nil {
return nil, err
}
// Set ascending flag
if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED {
result.asc = true
}
return &result, nil
}
func (s *stream) processSegmentsAndBuildFilters(
ctx context.Context,
segments []storage.Segment[*tsTable, option],
series []*pbv1.Series,
sqo model.StreamQueryOptions,
tr *index.RangeOpts,
) (idxResult, posting.List, posting.List, error) {
var result idxResult
result.pm = s.pm
result.segments = segments
result.sm = s
result.qo = queryOptions{
StreamQueryOptions: sqo,
seriesToEntity: make(map[common.SeriesID][]*modelv1.TagValue),
}
seriesFilter := roaring.NewPostingList()
var resultTS posting.List
var sl pbv1.SeriesList
var err error
for i := range result.segments {
sl, err = result.segments[i].Lookup(ctx, series)
if err != nil {
return result, nil, nil, err
}
var filter, filterTS posting.List
if filter, filterTS, err = indexSearch(ctx, sqo, segments[i].Tables(), sl.ToList().ToSlice(), tr); err != nil {
return result, nil, nil, err
}
if filter != nil && filter.IsEmpty() {
continue
}
if result.qo.elementFilter == nil {
result.qo.elementFilter = filter
resultTS = filterTS
} else {
if err = result.qo.elementFilter.Union(filter); err != nil {
return result, nil, nil, err
}
if err = resultTS.Union(filterTS); err != nil {
return result, nil, nil, err
}
}
for j := range sl {
if seriesFilter.Contains(uint64(sl[j].ID)) {
continue
}
seriesFilter.Insert(uint64(sl[j].ID))
result.qo.seriesToEntity[sl[j].ID] = sl[j].EntityValues
}
result.tabs = append(result.tabs, result.segments[i].Tables()...)
}
return result, seriesFilter, resultTS, nil
}
type queryOptions struct {
elementFilter posting.List
seriesToEntity map[common.SeriesID][]*modelv1.TagValue
sortedSids []common.SeriesID
model.StreamQueryOptions
minTimestamp int64
maxTimestamp int64
}
func (qo *queryOptions) reset() {
qo.StreamQueryOptions.Reset()
qo.elementFilter = nil
qo.seriesToEntity = nil
qo.sortedSids = nil
qo.minTimestamp = 0
qo.maxTimestamp = 0
}
func (qo *queryOptions) copyFrom(other *queryOptions) {
qo.StreamQueryOptions.CopyFrom(&other.StreamQueryOptions)
qo.elementFilter = other.elementFilter
qo.seriesToEntity = other.seriesToEntity
qo.sortedSids = other.sortedSids
qo.minTimestamp = other.minTimestamp
qo.maxTimestamp = other.maxTimestamp
}
func indexSearch(ctx context.Context, sqo model.StreamQueryOptions,
tabs []*tsTable, seriesList []uint64, tr *index.RangeOpts,
) (posting.List, posting.List, error) {
if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
return nil, nil, nil
}
result, resultTS := roaring.NewPostingList(), roaring.NewPostingList()
for _, tw := range tabs {
index := tw.Index()
pl, plTS, err := index.Search(ctx, seriesList, sqo.Filter, tr)
if err != nil {
return nil, nil, err
}
if pl == nil || pl.IsEmpty() {
continue
}
if err := result.Union(pl); err != nil {
return nil, nil, err
}
if plTS == nil || plTS.IsEmpty() {
continue
}
if err := resultTS.Union(plTS); err != nil {
return nil, nil, err
}
}
return result, resultTS, nil
}
func (s *stream) indexSort(ctx context.Context, sqo model.StreamQueryOptions, tabs []*tsTable,
sids []uint64,
) (itersort.Iterator[*index.DocumentResult], error) {
if sqo.Order == nil || sqo.Order.Index == nil {
return nil, nil
}
seriesList := make([]common.SeriesID, len(sids))
for i := range sids {
seriesList[i] = common.SeriesID(sids[i])
}
iters, err := s.buildItersByIndex(ctx, tabs, seriesList, sqo)
if err != nil {
return nil, err
}
desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == modelv1.Sort_SORT_DESC
return itersort.NewItemIter[*index.DocumentResult](iters, desc), nil
}
func (s *stream) buildItersByIndex(ctx context.Context, tables []*tsTable,
sids []common.SeriesID, sqo model.StreamQueryOptions,
) (iters []itersort.Iterator[*index.DocumentResult], err error) {
indexRuleForSorting := sqo.Order.Index
if len(indexRuleForSorting.Tags) != 1 {
return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags))
}
for _, tw := range tables {
var iter index.FieldIterator[*index.DocumentResult]
fieldKey := index.FieldKey{
IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
Analyzer: indexRuleForSorting.GetAnalyzer(),
}
iter, err = tw.Index().Sort(ctx, sids, fieldKey, sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize)
if err != nil {
return nil, err
}
iters = append(iters, iter)
}
return
}
func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue *modelv1.TagValue, num int) [][]byte {
values := make([][]byte, num)
tv := encodeTagValue(name, tagType, tagValue)
defer releaseTagValue(tv)
value := tv.marshal()
for i := 0; i < num; i++ {
values[i] = value
}
return values
}
func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue {
if value == nil {
return pbv1.NullTagValue
}
switch valueType {
case pbv1.ValueTypeInt64:
return int64TagValue(convert.BytesToInt64(value))
case pbv1.ValueTypeStr:
return strTagValue(string(value))
case pbv1.ValueTypeBinaryData:
return binaryDataTagValue(value)
case pbv1.ValueTypeInt64Arr:
var values []int64
for i := 0; i < len(value); i += 8 {
values = append(values, convert.BytesToInt64(value[i:i+8]))
}
return int64ArrTagValue(values)
case pbv1.ValueTypeStrArr:
var values []string
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
var err error
for len(value) > 0 {
bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], value)
if err != nil {
logger.Panicf("unmarshalVarArray failed: %v", err)
}
values = append(values, string(bb.Buf))
}
return strArrTagValue(values)
default:
logger.Panicf("unsupported value type: %v", valueType)
return nil
}
}
func int64TagValue(value int64) *modelv1.TagValue {
return &modelv1.TagValue{
Value: &modelv1.TagValue_Int{
Int: &modelv1.Int{
Value: value,
},
},
}
}
func strTagValue(value string) *modelv1.TagValue {
return &modelv1.TagValue{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: value,
},
},
}
}
func binaryDataTagValue(value []byte) *modelv1.TagValue {
data := make([]byte, len(value))
copy(data, value)
return &modelv1.TagValue{
Value: &modelv1.TagValue_BinaryData{
BinaryData: data,
},
}
}
func int64ArrTagValue(values []int64) *modelv1.TagValue {
return &modelv1.TagValue{
Value: &modelv1.TagValue_IntArray{
IntArray: &modelv1.IntArray{
Value: values,
},
},
}
}
func strArrTagValue(values []string) *modelv1.TagValue {
return &modelv1.TagValue{
Value: &modelv1.TagValue_StrArray{
StrArray: &modelv1.StrArray{
Value: values,
},
},
}
}
func updateTimeRange(filterTS posting.List, minTimestamp, maxTimestamp int64) (int64, int64) {
if filterTS != nil && !filterTS.IsEmpty() {
if minTS, err := filterTS.Min(); err == nil && int64(minTS) > minTimestamp {
minTimestamp = int64(minTS)
}
if maxTS, err := filterTS.Max(); err == nil && int64(maxTS) < maxTimestamp {
maxTimestamp = int64(maxTS)
}
}
return minTimestamp, maxTimestamp
}