banyand/metadata/schema/etcd.go (297 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
var (
_ Stream = (*etcdSchemaRegistry)(nil)
_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
_ IndexRule = (*etcdSchemaRegistry)(nil)
_ Measure = (*etcdSchemaRegistry)(nil)
_ Group = (*etcdSchemaRegistry)(nil)
_ Property = (*etcdSchemaRegistry)(nil)
errUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
errConcurrentModification = errors.New("concurrent modification of entities")
)
// HasMetadata allows getting Metadata.
type HasMetadata interface {
GetMetadata() *commonv1.Metadata
proto.Message
}
// RegistryOption is the option to create Registry.
type RegistryOption func(*etcdSchemaRegistryConfig)
// ConfigureServerEndpoints sets a list of the server urls.
func ConfigureServerEndpoints(url []string) RegistryOption {
return func(config *etcdSchemaRegistryConfig) {
config.serverEndpoints = url
}
}
type eventHandler struct {
handler EventHandler
interestKeys Kind
}
func (eh *eventHandler) interestOf(kind Kind) bool {
return KindMask&kind&eh.interestKeys != 0
}
type etcdSchemaRegistry struct {
client *clientv3.Client
closer *run.Closer
handlers []*eventHandler
mux sync.RWMutex
}
type etcdSchemaRegistryConfig struct {
serverEndpoints []string
}
func (e *etcdSchemaRegistry) RegisterHandler(kind Kind, handler EventHandler) {
e.mux.Lock()
defer e.mux.Unlock()
e.handlers = append(e.handlers, &eventHandler{
interestKeys: kind,
handler: handler,
})
}
func (e *etcdSchemaRegistry) notifyUpdate(metadata Metadata) {
e.mux.RLock()
hh := e.handlers
e.mux.RUnlock()
for _, h := range hh {
if h.interestOf(metadata.Kind) {
h.handler.OnAddOrUpdate(metadata)
}
}
}
func (e *etcdSchemaRegistry) notifyDelete(metadata Metadata) {
e.mux.RLock()
hh := e.handlers
e.mux.RUnlock()
for _, h := range hh {
if h.interestOf(metadata.Kind) {
h.handler.OnDelete(metadata)
}
}
}
func (e *etcdSchemaRegistry) Close() error {
e.closer.Done()
e.closer.CloseThenWait()
return e.client.Close()
}
// NewEtcdSchemaRegistry returns a Registry powered by Etcd.
func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {
registryConfig := &etcdSchemaRegistryConfig{}
for _, opt := range options {
opt(registryConfig)
}
if registryConfig.serverEndpoints == nil {
return nil, errors.New("server address is not set")
}
zapCfg := logger.GetLogger("etcd").ToZapConfig()
var l *zap.Logger
var err error
if l, err = zapCfg.Build(); err != nil {
return nil, err
}
config := clientv3.Config{
Endpoints: registryConfig.serverEndpoints,
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 30 * time.Second,
DialKeepAliveTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Logger: l,
}
client, err := clientv3.New(config)
if err != nil {
return nil, err
}
reg := &etcdSchemaRegistry{
client: client,
closer: run.NewCloser(1),
}
return reg, nil
}
func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
if resp.Count == 0 {
return ErrGRPCResourceNotFound
}
if resp.Count > 1 {
return errUnexpectedNumberOfEntities
}
if err = proto.Unmarshal(resp.Kvs[0].Value, message); err != nil {
return err
}
if messageWithMetadata, ok := message.(HasMetadata); ok {
// Assign readonly fields
messageWithMetadata.GetMetadata().CreateRevision = resp.Kvs[0].CreateRevision
messageWithMetadata.GetMetadata().ModRevision = resp.Kvs[0].ModRevision
}
return nil
}
// update will first ensure the existence of the entity with the metadata,
// and overwrite the existing value if so.
// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return err
}
getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
if getResp.Count > 1 {
return errUnexpectedNumberOfEntities
}
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
return err
}
replace := getResp.Count > 0
if replace {
existingVal, innerErr := metadata.Unmarshal(getResp.Kvs[0].Value)
if innerErr != nil {
return innerErr
}
// directly return if we have the same entity
if metadata.equal(existingVal) {
return nil
}
modRevision := getResp.Kvs[0].ModRevision
txnResp, txnErr := e.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)).
Then(clientv3.OpPut(key, string(val))).
Commit()
if txnErr != nil {
return txnErr
}
if !txnResp.Succeeded {
return errConcurrentModification
}
} else {
return ErrGRPCResourceNotFound
}
e.notifyUpdate(metadata)
return nil
}
// create will first check existence of the entity with the metadata,
// and put the value if it does not exist.
// Otherwise, it will return ErrGRPCAlreadyExists.
func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return err
}
getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
if getResp.Count > 1 {
return errUnexpectedNumberOfEntities
}
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
return err
}
replace := getResp.Count > 0
if replace {
return errGRPCAlreadyExists
}
_, err = e.client.Put(ctx, key, string(val))
if err != nil {
return err
}
e.notifyUpdate(metadata)
return nil
}
func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, factory func() proto.Message) ([]proto.Message, error) {
if !e.closer.AddRunning() {
return nil, ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(prefix)))
if err != nil {
return nil, err
}
entities := make([]proto.Message, resp.Count)
for i := int64(0); i < resp.Count; i++ {
message := factory()
if innerErr := proto.Unmarshal(resp.Kvs[i].Value, message); innerErr != nil {
return nil, innerErr
}
entities[i] = message
if messageWithMetadata, ok := message.(HasMetadata); ok {
// Assign readonly fields
messageWithMetadata.GetMetadata().CreateRevision = resp.Kvs[i].CreateRevision
messageWithMetadata.GetMetadata().ModRevision = resp.Kvs[i].ModRevision
}
}
return entities, nil
}
func listPrefixesForEntity(group, entityPrefix string) string {
return groupsKeyPrefix + group + entityPrefix
}
func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (bool, error) {
if !e.closer.AddRunning() {
return false, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return false, err
}
resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV())
if err != nil {
return false, err
}
if resp.Deleted == 1 {
var message proto.Message
switch metadata.Kind {
case KindMeasure:
message = &databasev1.Measure{}
case KindStream:
message = &databasev1.Stream{}
case KindIndexRuleBinding:
message = &databasev1.IndexRuleBinding{}
case KindIndexRule:
message = &databasev1.IndexRule{}
case KindProperty:
message = &propertyv1.Property{}
case KindTopNAggregation:
message = &databasev1.TopNAggregation{}
default:
return false, nil
}
if unmarshalErr := proto.Unmarshal(resp.PrevKvs[0].Value, message); unmarshalErr == nil {
e.notifyDelete(Metadata{
TypeMeta: TypeMeta{
Kind: metadata.Kind,
Name: metadata.Name,
Group: metadata.Group,
},
Spec: message,
})
}
return true, nil
}
return false, nil
}
func formatKey(entityPrefix string, metadata *commonv1.Metadata) string {
return groupsKeyPrefix + metadata.GetGroup() + entityPrefix + metadata.GetName()
}
func incrementLastByte(key string) string {
bb := []byte(key)
bb[len(bb)-1]++
return string(bb)
}