banyand/measure/measure_query.go (143 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "io" "time" "github.com/pkg/errors" "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) var errTagFamilyNotExist = errors.New("tag family doesn't exist") // Query allow to retrieve measure data points. type Query interface { LoadGroup(name string) (resourceSchema.Group, bool) Measure(measure *commonv1.Metadata) (Measure, error) } // Measure allows inspecting measure data points' details. type Measure interface { io.Closer Shards(entity tsdb.Entity) ([]tsdb.Shard, error) CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error) Shard(id common.ShardID) (tsdb.Shard, error) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) GetSchema() *databasev1.Measure GetIndexRules() []*databasev1.IndexRule GetInterval() time.Duration } var _ Measure = (*measure)(nil) func (s *measure) GetInterval() time.Duration { return s.interval } func (s *measure) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) { wrap := func(shards []tsdb.Shard) []tsdb.Shard { result := make([]tsdb.Shard, len(shards)) for i := 0; i < len(shards); i++ { result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), shards[i]) } return result } db := s.databaseSupplier.SupplyTSDB() if len(entity) < 1 { return wrap(db.Shards()), nil } for _, e := range entity { if e == nil { return wrap(db.Shards()), nil } } shardID, err := partition.ShardID(entity.Prepend(tsdb.Entry(s.name)).Marshal(), s.shardNum) if err != nil { return nil, err } shard, err := s.databaseSupplier.SupplyTSDB().Shard(common.ShardID(shardID)) if err != nil { if errors.Is(err, tsdb.ErrUnknownShard) { return []tsdb.Shard{}, nil } return nil, err } return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil } func (s *measure) CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error) { wrap := func(shards []tsdb.Shard) []tsdb.Shard { result := make([]tsdb.Shard, len(shards)) for i := 0; i < len(shards); i++ { result[i] = tsdb.NewScopedShard(tsdb.Entry(formatMeasureCompanionPrefix(s.name, metadata.GetName())), shards[i]) } return result } db := s.databaseSupplier.SupplyTSDB() return wrap(db.Shards()), nil } func formatMeasureCompanionPrefix(measureName, name string) string { return measureName + "." + name } func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) { shard, err := s.databaseSupplier.SupplyTSDB().Shard(id) if err != nil { return nil, err } return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil } func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) { fid := familyIdentity(family, pbv1.TagFlag) familyRawBytes, err := item.Family(fid) if err != nil { return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family) } tagFamily := &modelv1.TagFamilyForWrite{} err = proto.Unmarshal(familyRawBytes, tagFamily) if err != nil { return nil, err } tags := make([]*modelv1.Tag, len(tagFamily.GetTags())) var tagSpec []*databasev1.TagSpec for _, tf := range s.schema.GetTagFamilies() { if tf.GetName() == family { tagSpec = tf.GetTags() } } if tagSpec == nil { return nil, errTagFamilyNotExist } for i, tag := range tagFamily.GetTags() { tags[i] = &modelv1.Tag{ Key: tagSpec[i].GetName(), Value: &modelv1.TagValue{ Value: tag.GetValue(), }, } } return &modelv1.TagFamily{ Name: family, Tags: tags, }, err } func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) { var fieldSpec *databasev1.FieldSpec for _, spec := range s.schema.GetFields() { if spec.GetName() == name { fieldSpec = spec break } } fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)) bytes, err := item.Family(fid) if err != nil { return nil, errors.Wrapf(err, "measure %s.%s parse field %s", s.name, s.group, name) } fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec) if err != nil { return nil, err } return &measurev1.DataPoint_Field{ Name: name, Value: fieldValue, }, err }