pkg/query/logical/common.go (165 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package logical import ( "bytes" "context" "io" "time" "github.com/pkg/errors" "go.uber.org/multierr" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( errTagNotDefined = errors.New("tag is not defined") errUnsupportedConditionOp = errors.New("unsupported condition operation") errUnsupportedConditionValue = errors.New("unsupported condition value type") errInvalidCriteriaType = errors.New("invalid criteria type") errIndexNotDefined = errors.New("index is not define for the tag") errInvalidData = errors.New("data is invalid") nullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} ) type ( // SeekerBuilder wraps the execution of tsdb.SeekerBuilder. // TODO:// we could have a chance to remove this wrapper. SeekerBuilder func(builder tsdb.SeekerBuilder) comparator func(a, b tsdb.Item) bool ) func createComparator(sortDirection modelv1.Sort) comparator { return func(a, b tsdb.Item) bool { comp := bytes.Compare(a.SortedField(), b.SortedField()) if sortDirection == modelv1.Sort_SORT_DESC { return comp == 1 } return comp == -1 } } // ProjectItem parses the item within the StreamExecutionContext. // projectionFieldRefs must be prepared before calling this method, projectionFieldRefs should be a list of // tag list where the inner list must exist in the same tag family. // Strict order can be guaranteed in the result. func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRefs [][]*TagRef) ([]*modelv1.TagFamily, error) { tagFamily := make([]*modelv1.TagFamily, len(projectionFieldRefs)) for i, refs := range projectionFieldRefs { if len(refs) == 0 { continue } tags := make([]*modelv1.Tag, len(refs)) familyName := refs[0].Tag.getFamilyName() parsedTagFamily, err := ec.ParseTagFamily(familyName, item) if err != nil { return nil, errors.WithMessage(err, "parse projection") } if len(refs) > len(parsedTagFamily.Tags) { return nil, errors.Wrapf(errInvalidData, "the number of tags %d in %s is less then expected %d", len(parsedTagFamily.Tags), familyName, len(refs)) } for j, ref := range refs { if len(parsedTagFamily.GetTags()) > ref.Spec.TagIdx { tags[j] = parsedTagFamily.GetTags()[ref.Spec.TagIdx] } else { tags[j] = &modelv1.Tag{Key: ref.Tag.name, Value: nullTag} } } tagFamily[i] = &modelv1.TagFamily{ Name: familyName, Tags: tags, } } return tagFamily, nil } // ExecuteForShard fetches elements from series within a single shard. A list of series must be prepared in advanced // with the help of Entity. The result is a list of element set, where the order of inner list is kept // as what the users specify in the seekerBuilder. // This method is used by the underlying tableScan and localIndexScan plans. func ExecuteForShard(l *logger.Logger, series tsdb.SeriesList, timeRange timestamp.TimeRange, builders ...SeekerBuilder, ) ([]tsdb.Iterator, []io.Closer, error) { var itersInShard []tsdb.Iterator var closers []io.Closer for _, seriesFound := range series { itersInSeries, err := func() ([]tsdb.Iterator, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() sp, errInner := seriesFound.Span(context.WithValue(ctx, logger.ContextKey, l), timeRange) if errInner != nil { if errors.Is(errInner, tsdb.ErrEmptySeriesSpan) { return nil, nil } return nil, errInner } closers = append(closers, sp) b := sp.SeekerBuilder() for _, builder := range builders { builder(b) } seeker, errInner := b.Build() if errInner != nil { return nil, errInner } iters, errInner := seeker.Seek() if errInner != nil { return nil, errInner } return iters, nil }() if err != nil { if len(closers) > 0 { for _, closer := range closers { err = multierr.Append(err, closer.Close()) } } return nil, nil, err } if len(itersInSeries) > 0 { itersInShard = append(itersInShard, itersInSeries...) } } return itersInShard, closers, nil } // Tag represents the combination of tag family and tag name. // It's a tag's identity. type Tag struct { familyName, name string } // NewTag return a new Tag. func NewTag(family, name string) *Tag { return &Tag{ familyName: family, name: name, } } // NewTags create an array of Tag within a TagFamily. func NewTags(family string, tagNames ...string) []*Tag { tags := make([]*Tag, len(tagNames)) for i, name := range tagNames { tags[i] = NewTag(family, name) } return tags } // GetCompoundName is only used for error message. func (t *Tag) GetCompoundName() string { return t.familyName + ":" + t.name } func (t *Tag) getTagName() string { return t.name } func (t *Tag) getFamilyName() string { return t.familyName } // ToTags converts a projection spec to Tag sets. func ToTags(projection *modelv1.TagProjection) [][]*Tag { projTags := make([][]*Tag, len(projection.GetTagFamilies())) for i, tagFamily := range projection.GetTagFamilies() { var projTagInFamily []*Tag for _, tagName := range tagFamily.GetTags() { projTagInFamily = append(projTagInFamily, NewTag(tagFamily.GetName(), tagName)) } projTags[i] = projTagInFamily } return projTags } // Field identity a field in a measure. type Field struct { Name string } // NewField return a new Field. func NewField(name string) *Field { return &Field{Name: name} } // StringSlicesEqual reports whether a and b are the same length and contain the same strings. // A nil argument is equivalent to an empty slice. func StringSlicesEqual(a, b []string) bool { if len(a) != len(b) { return false } for i, v := range a { if v != b[i] { return false } } return true }