pkg/query/logical/measure/measure_plan_distributed.go (341 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "bytes" "container/list" "context" "fmt" "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/data" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) const defaultQueryTimeout = 15 * time.Second var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { originalQuery *measurev1.QueryRequest groupByEntity bool } func newUnresolvedDistributed(query *measurev1.QueryRequest) logical.UnresolvedPlan { return &unresolvedDistributed{ originalQuery: query, } } func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) { projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection()) if len(projectionTags) > 0 { var err error projTagsRefs, err := s.CreateTagRef(projectionTags...) if err != nil { return nil, err } s = s.ProjTags(projTagsRefs...) } projectionFields := make([]*logical.Field, len(ud.originalQuery.GetFieldProjection().GetNames())) for i, fieldNameProj := range ud.originalQuery.GetFieldProjection().GetNames() { projectionFields[i] = logical.NewField(fieldNameProj) } if len(projectionFields) > 0 { var err error projFieldRefs, err := s.CreateFieldRef(projectionFields...) if err != nil { return nil, err } s = s.ProjFields(projFieldRefs...) } limit := ud.originalQuery.GetLimit() if limit == 0 { limit = defaultLimit } temp := &measurev1.QueryRequest{ TagProjection: ud.originalQuery.TagProjection, FieldProjection: ud.originalQuery.FieldProjection, Name: ud.originalQuery.Name, Groups: ud.originalQuery.Groups, Criteria: ud.originalQuery.Criteria, Limit: limit + ud.originalQuery.Offset, OrderBy: ud.originalQuery.OrderBy, } if ud.groupByEntity { e := s.EntityList()[0] sortTagSpec := s.FindTagSpecByName(e) if sortTagSpec == nil { return nil, fmt.Errorf("entity tag %s not found", e) } result := &distributedPlan{ queryTemplate: temp, s: s, sortByTime: false, sortTagSpec: *sortTagSpec, } if ud.originalQuery.OrderBy != nil && ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true } return result, nil } if ud.originalQuery.OrderBy == nil { return &distributedPlan{ queryTemplate: temp, s: s, sortByTime: true, }, nil } if ud.originalQuery.OrderBy.IndexRuleName == "" { result := &distributedPlan{ queryTemplate: temp, s: s, sortByTime: true, } if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true } return result, nil } ok, indexRule := s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName) if !ok { return nil, fmt.Errorf("index rule %s not found", ud.originalQuery.OrderBy.IndexRuleName) } if len(indexRule.Tags) != 1 { return nil, fmt.Errorf("index rule %s should have only one tag", ud.originalQuery.OrderBy.IndexRuleName) } sortTagSpec := s.FindTagSpecByName(indexRule.Tags[0]) if sortTagSpec == nil { return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0]) } result := &distributedPlan{ queryTemplate: temp, s: s, sortByTime: false, sortTagSpec: *sortTagSpec, } if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true } return result, nil } type distributedPlan struct { s logical.Schema queryTemplate *measurev1.QueryRequest sortTagSpec logical.TagSpec sortByTime bool desc bool maxDataPointsSize uint32 } func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, err error) { dctx := executor.FromDistributedExecutionContext(ctx) queryRequest := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest) queryRequest.TimeRange = dctx.TimeRange() if t.maxDataPointsSize > 0 { queryRequest.Limit = t.maxDataPointsSize } tracer := query.GetTracer(ctx) var span *query.Span if tracer != nil { span, _ = tracer.StartSpan(ctx, "distributed-client") queryRequest.Trace = true span.Tag("request", convert.BytesToString(logger.Proto(queryRequest))) span.Tag("node_selectors", fmt.Sprintf("%v", dctx.NodeSelectors())) span.Tag("time_range", dctx.TimeRange().String()) defer func() { if err != nil { span.Error(err) } else { span.Stop() } }() } ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), dctx.NodeSelectors(), dctx.TimeRange(), queryRequest)) if err != nil { return nil, err } var see []sort.Iterator[*comparableDataPoint] for _, f := range ff { if m, getErr := f.Get(); getErr != nil { err = multierr.Append(err, getErr) } else { d := m.Data() if d == nil { continue } resp := d.(*measurev1.QueryResponse) if span != nil { span.AddSubTrace(resp.Trace) } see = append(see, newSortableElements(resp.DataPoints, t.sortByTime, t.sortTagSpec)) } } smi := &sortedMIterator{ Iterator: sort.NewItemIter(see, t.desc), } smi.init() return smi, err } func (t *distributedPlan) String() string { return fmt.Sprintf("distributed:%s", t.queryTemplate.String()) } func (t *distributedPlan) Children() []logical.Plan { return []logical.Plan{} } func (t *distributedPlan) Schema() logical.Schema { return t.s } func (t *distributedPlan) Limit(maxVal int) { t.maxDataPointsSize = uint32(maxVal) } var _ sort.Comparable = (*comparableDataPoint)(nil) type comparableDataPoint struct { *measurev1.DataPoint sortField []byte } func newComparableElement(e *measurev1.DataPoint, sortByTime bool, sortTagSpec logical.TagSpec) (*comparableDataPoint, error) { var sortField []byte if sortByTime { sortField = convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano())) } else { var err error sortField, err = pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value) if err != nil { return nil, err } } return &comparableDataPoint{ DataPoint: e, sortField: sortField, }, nil } func (e *comparableDataPoint) SortedField() []byte { return e.sortField } var _ sort.Iterator[*comparableDataPoint] = (*sortableElements)(nil) type sortableElements struct { cur *comparableDataPoint dataPoints []*measurev1.DataPoint sortTagSpec logical.TagSpec index int isSortByTime bool } func newSortableElements(dataPoints []*measurev1.DataPoint, isSortByTime bool, sortTagSpec logical.TagSpec) *sortableElements { return &sortableElements{ dataPoints: dataPoints, isSortByTime: isSortByTime, sortTagSpec: sortTagSpec, } } func (*sortableElements) Close() error { return nil } func (s *sortableElements) Next() bool { return s.iter(func(e *measurev1.DataPoint) (*comparableDataPoint, error) { return newComparableElement(e, s.isSortByTime, s.sortTagSpec) }) } func (s *sortableElements) Val() *comparableDataPoint { return s.cur } func (s *sortableElements) iter(fn func(*measurev1.DataPoint) (*comparableDataPoint, error)) bool { if s.index >= len(s.dataPoints) { return false } cur, err := fn(s.dataPoints[s.index]) s.index++ if err != nil { return s.iter(fn) } s.cur = cur return s.index <= len(s.dataPoints) } var _ executor.MIterator = (*sortedMIterator)(nil) type sortedMIterator struct { sort.Iterator[*comparableDataPoint] data *list.List uniqueData map[uint64]*measurev1.DataPoint cur *measurev1.DataPoint initialized bool closed bool } func (s *sortedMIterator) init() { if s.initialized { return } s.initialized = true if !s.Iterator.Next() { s.closed = true return } s.data = list.New() s.uniqueData = make(map[uint64]*measurev1.DataPoint) s.loadDps() } func (s *sortedMIterator) Next() bool { if s.data == nil { return false } if s.data.Len() == 0 { s.loadDps() if s.data.Len() == 0 { return false } } dp := s.data.Front() s.data.Remove(dp) s.cur = dp.Value.(*measurev1.DataPoint) return true } func (s *sortedMIterator) loadDps() { if s.closed { return } for k := range s.uniqueData { delete(s.uniqueData, k) } first := s.Iterator.Val() s.uniqueData[hashDataPoint(first.DataPoint)] = first.DataPoint for { if !s.Iterator.Next() { s.closed = true break } v := s.Iterator.Val() if bytes.Equal(first.SortedField(), v.SortedField()) { key := hashDataPoint(v.DataPoint) if existed, ok := s.uniqueData[key]; ok { if v.DataPoint.Version > existed.Version { s.uniqueData[key] = v.DataPoint } } else { s.uniqueData[key] = v.DataPoint } } else { break } } for _, v := range s.uniqueData { s.data.PushBack(v) } } func (s *sortedMIterator) Current() []*measurev1.DataPoint { return []*measurev1.DataPoint{s.cur} } const ( offset64 = 14695981039346656037 prime64 = 1099511628211 ) // hashDataPoint calculates the hash value of a data point with fnv64a. // https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function func hashDataPoint(dp *measurev1.DataPoint) uint64 { h := uint64(offset64) h = (h ^ dp.Sid) * prime64 h = (h ^ uint64(dp.Timestamp.Seconds)) * prime64 h = (h ^ uint64(dp.Timestamp.Nanos)) * prime64 return h }