banyand/stream/stream_query.go (103 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"io"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/partition"
)
var errTagFamilyNotExist = errors.New("tag family doesn't exist")
// Query allow to retrieve elements in a series of streams.
type Query interface {
Stream(stream *commonv1.Metadata) (Stream, error)
}
// Stream allows inspecting elements' details.
type Stream interface {
io.Closer
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error)
ParseElementID(item tsdb.Item) (string, error)
GetSchema() *databasev1.Stream
GetIndexRules() []*databasev1.IndexRule
}
var _ Stream = (*stream)(nil)
func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
wrap := func(shards []tsdb.Shard) []tsdb.Shard {
result := make([]tsdb.Shard, len(shards))
for i := 0; i < len(shards); i++ {
result[i] = tsdb.NewScopedShard(tsdb.Entry(s.name), shards[i])
}
return result
}
db := s.db.SupplyTSDB()
if len(entity) < 1 {
return wrap(db.Shards()), nil
}
for _, e := range entity {
if e == nil {
return wrap(db.Shards()), nil
}
}
shardID, err := partition.ShardID(entity.Prepend(tsdb.Entry(s.name)).Marshal(), s.shardNum)
if err != nil {
return nil, err
}
shard, err := db.Shard(common.ShardID(shardID))
if err != nil {
if errors.Is(err, tsdb.ErrUnknownShard) {
return []tsdb.Shard{}, nil
}
return nil, err
}
return []tsdb.Shard{tsdb.NewScopedShard(tsdb.Entry(s.name), shard)}, nil
}
func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
shard, err := s.db.SupplyTSDB().Shard(id)
if err != nil {
return nil, err
}
return tsdb.NewScopedShard(tsdb.Entry(s.name), shard), nil
}
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
if err != nil {
return nil, errors.Wrapf(err, "stream %s.%s parse family %s", s.name, s.group, family)
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
if err != nil {
return nil, err
}
tags := make([]*modelv1.Tag, len(tagFamily.GetTags()))
var tagSpec []*databasev1.TagSpec
for _, tf := range s.schema.GetTagFamilies() {
if tf.GetName() == family {
tagSpec = tf.GetTags()
}
}
if tagSpec == nil {
return nil, errTagFamilyNotExist
}
for i, tag := range tagFamily.GetTags() {
tags[i] = &modelv1.Tag{
Key: tagSpec[i].GetName(),
Value: &modelv1.TagValue{
Value: tag.GetValue(),
},
}
}
return &modelv1.TagFamily{
Name: family,
Tags: tags,
}, err
}
func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
rawBytes, err := item.Val()
if err != nil {
return "", err
}
return string(rawBytes), nil
}