banyand/metadata/schema/group.go (84 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"strings"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
)
var (
groupsKeyPrefix = "/groups/"
groupMetadataKey = "/__meta_group__"
)
func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, group string) (*commonv1.Group, error) {
var entity commonv1.Group
err := e.get(ctx, formatGroupKey(group), &entity)
if err != nil {
return nil, errors.WithMessagef(err, "GetGroup[%s]", group)
}
return &entity, nil
}
func (e *etcdSchemaRegistry) ListGroup(ctx context.Context) ([]*commonv1.Group, error) {
messages, err := e.client.Get(ctx, groupsKeyPrefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
if err != nil {
return nil, err
}
var groups []*commonv1.Group
for _, kv := range messages.Kvs {
// kv.Key = "/groups/" + {group} + "/__meta_info__"
if strings.HasSuffix(string(kv.Key), groupMetadataKey) {
message := &commonv1.Group{}
if innerErr := proto.Unmarshal(kv.Value, message); innerErr != nil {
return nil, innerErr
}
groups = append(groups, message)
}
}
return groups, nil
}
func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) (bool, error) {
g, err := e.GetGroup(ctx, group)
if err != nil {
return false, errors.Wrap(err, group)
}
keyPrefix := groupsKeyPrefix + g.GetMetadata().GetName() + "/"
resp, err := e.client.Delete(ctx, keyPrefix, clientv3.WithRange(incrementLastByte(keyPrefix)))
if err != nil {
return false, err
}
if resp.Deleted > 0 {
e.notifyDelete(Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group,
},
Spec: g,
})
}
return true, nil
}
func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Group) error {
if group.UpdatedAt != nil {
group.UpdatedAt = timestamppb.Now()
}
return e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group.GetMetadata().GetName(),
},
Spec: group,
})
}
func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group.GetMetadata().GetName(),
},
Spec: group,
})
}
func formatGroupKey(group string) string {
return groupsKeyPrefix + group + groupMetadataKey
}