banyand/metadata/schema/property.go (146 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
)
const all = "*"
var propertyKeyPrefix = "/properties/"
func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (*propertyv1.Property, error) {
var entity propertyv1.Property
if err := e.get(ctx, formatPropertyKey(transformKey(metadata)), &entity); err != nil {
return nil, err
}
return filterTags(&entity, tags), nil
}
func filterTags(property *propertyv1.Property, tags []string) *propertyv1.Property {
if len(tags) == 0 || tags[0] == all {
return property
}
filtered := &propertyv1.Property{
Metadata: property.Metadata,
UpdatedAt: property.UpdatedAt,
}
for _, expectedTag := range tags {
for _, t := range property.Tags {
if t.Key == expectedTag {
filtered.Tags = append(filtered.Tags, t)
}
}
}
return filtered
}
func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *commonv1.Metadata, ids []string, tags []string) ([]*propertyv1.Property, error) {
if container.Group == "" {
return nil, BadRequest("container.group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(container.Group, propertyKeyPrefix+container.Name), func() proto.Message {
return &propertyv1.Property{}
})
if err != nil {
return nil, err
}
entities := make([]*propertyv1.Property, 0, len(messages))
for _, message := range messages {
p := message.(*propertyv1.Property)
if len(ids) < 1 || ids[0] == all {
entities = append(entities, filterTags(p, tags))
continue
}
for _, id := range ids {
if p.Metadata.Id == id {
entities = append(entities, filterTags(p, tags))
}
}
}
return entities, nil
}
func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property *propertyv1.Property, strategy propertyv1.ApplyRequest_Strategy) (bool, uint32, error) {
m := transformKey(property.GetMetadata())
group := m.GetGroup()
if _, getGroupErr := e.GetGroup(ctx, group); getGroupErr != nil {
return false, 0, errors.Wrap(getGroupErr, "group is not exist")
}
if property.UpdatedAt != nil {
property.UpdatedAt = timestamppb.Now()
}
md := Metadata{
TypeMeta: TypeMeta{
Kind: KindProperty,
Group: group,
Name: m.GetName(),
},
Spec: property,
}
tagsNum := uint32(len(property.Tags))
err := e.create(ctx, md)
if err == nil {
return true, tagsNum, nil
}
if !errors.Is(err, errGRPCAlreadyExists) {
return false, 0, err
}
if strategy != propertyv1.ApplyRequest_STRATEGY_REPLACE {
existed, errGet := e.GetProperty(ctx, property.Metadata, nil)
if errGet != nil {
return false, 0, errGet
}
for i := 0; i < int(tagsNum); i++ {
t := property.Tags[0]
property.Tags = property.Tags[1:]
for _, et := range existed.Tags {
if et.Key == t.Key {
et.Value = t.Value
}
}
}
existed.Tags = append(existed.Tags, property.Tags...)
md.Spec = existed
}
if err = e.update(ctx, md); err != nil {
return false, 0, err
}
return false, tagsNum, nil
}
func (e *etcdSchemaRegistry) DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (bool, uint32, error) {
if len(tags) == 0 || tags[0] == all {
m := transformKey(metadata)
deleted, err := e.delete(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindProperty,
Group: m.GetGroup(),
Name: m.GetName(),
},
})
return deleted, 0, err
}
property, err := e.GetProperty(ctx, metadata, nil)
if err != nil {
return false, 0, err
}
filtered := &propertyv1.Property{
Metadata: property.Metadata,
UpdatedAt: property.UpdatedAt,
}
for _, expectedTag := range tags {
for _, t := range property.Tags {
if t.Key != expectedTag {
filtered.Tags = append(filtered.Tags, t)
}
}
}
_, num, err := e.ApplyProperty(ctx, filtered, propertyv1.ApplyRequest_STRATEGY_REPLACE)
return true, num, err
}
func transformKey(metadata *propertyv1.Metadata) *commonv1.Metadata {
return &commonv1.Metadata{
Group: metadata.Container.GetGroup(),
Name: metadata.Container.Name + "/" + metadata.Id,
}
}
func formatPropertyKey(metadata *commonv1.Metadata) string {
return formatKey(propertyKeyPrefix, metadata)
}