pkg/partition/entity.go (74 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package partition implements a location system to find a shard or index rule. // This system reflects the entity identity which is a intermediate result in calculating the target shard. package partition import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) // ErrMalformedElement indicates the element is malformed. var ErrMalformedElement = errors.New("element is malformed") // Locator combines several TagLocators that help find the entity or sharding key value. type Locator struct { TagLocators []TagLocator ModRevision int64 } // TagLocator contains offsets to retrieve a tag swiftly. type TagLocator struct { FamilyOffset int TagOffset int } // NewEntityLocator return a Locator based on tag family spec and entity spec. func NewEntityLocator(families []*databasev1.TagFamilySpec, entity *databasev1.Entity, modRevision int64) Locator { locator := make([]TagLocator, 0, len(entity.GetTagNames())) for _, tagInEntity := range entity.GetTagNames() { fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInEntity) if tag != nil { locator = append(locator, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex}) } } return Locator{TagLocators: locator, ModRevision: modRevision} } // NewShardingKeyLocator returns a Locator based on tag family spec and sharding key spec. func NewShardingKeyLocator(families []*databasev1.TagFamilySpec, shardingKey *databasev1.ShardingKey) Locator { locator := make([]TagLocator, 0, len(shardingKey.GetTagNames())) for _, tagInShardingKey := range shardingKey.GetTagNames() { fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInShardingKey) if tag != nil { locator = append(locator, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex}) } } return Locator{TagLocators: locator} } // Find the entity from a tag family, prepend a subject to the entity. func (l Locator) Find(subject string, value []*modelv1.TagFamilyForWrite) (pbv1.Entity, pbv1.EntityValues, error) { entityValues := make(pbv1.EntityValues, len(l.TagLocators)+1) entityValues[0] = pbv1.EntityStrValue(subject) for i, index := range l.TagLocators { tag, err := GetTagByOffset(value, index.FamilyOffset, index.TagOffset) if err != nil { return nil, nil, err } entityValues[i+1] = tag } entity, err := entityValues.ToEntity() if err != nil { return nil, nil, err } return entity, entityValues, nil } // Locate a shard and find the entity or sharding key from a tag family, prepend a subject to the entity or sharding key. func (l Locator) Locate(subject string, value []*modelv1.TagFamilyForWrite, shardNum uint32) (pbv1.Entity, pbv1.EntityValues, common.ShardID, error) { entity, tagValues, err := l.Find(subject, value) if err != nil { return nil, nil, 0, err } id, err := ShardID(entity.Marshal(), shardNum) if err != nil { return nil, nil, 0, err } return entity, tagValues, common.ShardID(id), nil } // GetTagByOffset gets a tag value based of a tag family offset and a tag offset in this family. func GetTagByOffset(value []*modelv1.TagFamilyForWrite, fIndex, tIndex int) (*modelv1.TagValue, error) { if fIndex >= len(value) { return nil, errors.Wrap(ErrMalformedElement, "tag family offset is invalid") } family := value[fIndex] if tIndex >= len(family.GetTags()) { return nil, errors.Wrap(ErrMalformedElement, "tag offset is invalid") } return family.GetTags()[tIndex], nil }