banyand/metadata/schema/etcd.go (526 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package schema import ( "context" "crypto/tls" "fmt" "path" "sync" "time" "github.com/pkg/errors" "github.com/rs/zerolog" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) const ( minCheckInterval = time.Second * 5 defaultCheckInterval = time.Minute * 10 ) var ( _ Stream = (*etcdSchemaRegistry)(nil) _ IndexRuleBinding = (*etcdSchemaRegistry)(nil) _ IndexRule = (*etcdSchemaRegistry)(nil) _ Measure = (*etcdSchemaRegistry)(nil) _ Group = (*etcdSchemaRegistry)(nil) errUnexpectedNumberOfEntities = errors.New("unexpected number of entities") errConcurrentModification = errors.New("concurrent modification of entities") ) // HasMetadata allows getting Metadata. type HasMetadata interface { GetMetadata() *commonv1.Metadata proto.Message } // RegistryOption is the option to create Registry. type RegistryOption func(*etcdSchemaRegistryConfig) // Namespace sets the namespace of the registry. func Namespace(namespace string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { config.namespace = namespace } } // ConfigureServerEndpoints sets a list of the server urls. func ConfigureServerEndpoints(url []string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { config.serverEndpoints = url } } // ConfigureEtcdUser sets a username & password of the etcd. func ConfigureEtcdUser(username string, password string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { if username != "" && password != "" { config.username = username config.password = password } } } // ConfigureEtcdTLSCAFile sets a trusted ca file of the etcd tls config. func ConfigureEtcdTLSCAFile(file string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { config.tlsCAFile = file } } // ConfigureEtcdTLSCertAndKey sets a cert & key of the etcd tls config. func ConfigureEtcdTLSCertAndKey(certFile string, keyFile string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { if certFile != "" && keyFile != "" { config.tlsCertFile = certFile config.tlsKeyFile = keyFile } } } // ConfigureWatchCheckInterval sets the interval to check the watcher. func ConfigureWatchCheckInterval(d time.Duration) RegistryOption { return func(config *etcdSchemaRegistryConfig) { if d >= minCheckInterval { config.checkInterval = d } } } // CheckInterval sets the interval to check the watcher. func CheckInterval(d time.Duration) WatcherOption { return func(wc *watcherConfig) { if d >= minCheckInterval { wc.checkInterval = d } } } type etcdSchemaRegistry struct { client *clientv3.Client closer *run.Closer l *logger.Logger watchers map[Kind]*watcher namespace string checkInterval time.Duration mux sync.RWMutex } type etcdSchemaRegistryConfig struct { namespace string username string password string tlsCAFile string tlsCertFile string tlsKeyFile string serverEndpoints []string checkInterval time.Duration } func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler EventHandler) { // Validate kind if kind&KindMask != kind { panic(fmt.Sprintf("invalid kind %d", kind)) } e.mux.Lock() defer e.mux.Unlock() var kinds []Kind for i := 0; i < KindSize; i++ { ki := Kind(1 << i) if kind&ki > 0 { kinds = append(kinds, ki) } } e.l.Info().Str("name", name).Interface("kinds", kinds).Msg("initializing schema cache") ok, revisions := handler.OnInit(kinds) if ok { if len(revisions) != len(kinds) { logger.Panicf("invalid number of revisions for %s", name) return } for i := range kinds { e.registerToWatcher(name, kinds[i], revisions[i], handler) } return } for i := range kinds { e.registerToWatcher(name, kinds[i], -1, handler) } } func (e *etcdSchemaRegistry) registerToWatcher(name string, kind Kind, revision int64, handler EventHandler) { if w, ok := e.watchers[kind]; ok { e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to an existing watcher") w.AddHandler(handler) if w.revision > revision { w.revision = revision } return } e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to a new watcher") w := e.NewWatcher(name, kind, revision, CheckInterval(e.checkInterval)) w.AddHandler(handler) e.watchers[kind] = w } func (e *etcdSchemaRegistry) Compact(ctx context.Context, revision int64) error { if !e.closer.AddRunning() { return ErrClosed } defer e.closer.Done() _, err := e.client.Compact(ctx, revision) return err } func (e *etcdSchemaRegistry) StartWatcher() { e.mux.RLock() defer e.mux.RUnlock() for _, w := range e.watchers { w.Start() } } func (e *etcdSchemaRegistry) Close() error { e.closer.Done() e.closer.CloseThenWait() e.mux.RLock() defer e.mux.RUnlock() for i := range e.watchers { e.watchers[i].Close() } return e.client.Close() } // NewEtcdSchemaRegistry returns a Registry powered by Etcd. func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) { registryConfig := &etcdSchemaRegistryConfig{} for _, opt := range options { opt(registryConfig) } if registryConfig.serverEndpoints == nil { return nil, errors.New("server address is not set") } log := logger.GetLogger("etcd-client").DefaultLevel(zerolog.ErrorLevel) zapCfg := log.ToZapConfig() var l *zap.Logger var err error if l, err = zapCfg.Build(); err != nil { return nil, err } config := clientv3.Config{ Endpoints: registryConfig.serverEndpoints, DialTimeout: 5 * time.Second, DialKeepAliveTime: 30 * time.Second, DialKeepAliveTimeout: 10 * time.Second, AutoSyncInterval: 5 * time.Minute, Logger: l, Username: registryConfig.username, Password: registryConfig.password, TLS: extractTLSConfig(registryConfig), } client, err := clientv3.New(config) if err != nil { return nil, err } reg := &etcdSchemaRegistry{ namespace: registryConfig.namespace, client: client, closer: run.NewCloser(1), l: logger.GetLogger("schema-registry"), checkInterval: registryConfig.checkInterval, watchers: make(map[Kind]*watcher), } return reg, nil } func (e *etcdSchemaRegistry) prependNamespace(key string) string { if e.namespace == "" { return key } return path.Join("/", e.namespace, key) } func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error { if !e.closer.AddRunning() { return ErrClosed } defer e.closer.Done() key = e.prependNamespace(key) resp, err := e.client.Get(ctx, key) if err != nil { return err } if resp.Count == 0 { return ErrGRPCResourceNotFound } if resp.Count > 1 { return errUnexpectedNumberOfEntities } if err = proto.Unmarshal(resp.Kvs[0].Value, message); err != nil { return err } if messageWithMetadata, ok := message.(HasMetadata); ok { // Assign readonly fields messageWithMetadata.GetMetadata().CreateRevision = resp.Kvs[0].CreateRevision messageWithMetadata.GetMetadata().ModRevision = resp.Kvs[0].ModRevision } return nil } // update will first ensure the existence of the entity with the metadata, // and overwrite the existing value if so. // Otherwise, it will return ErrGRPCResourceNotFound. func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) (int64, error) { if !e.closer.AddRunning() { return 0, ErrClosed } defer e.closer.Done() key, err := metadata.key() if err != nil { return 0, err } key = e.prependNamespace(key) getResp, err := e.client.Get(ctx, key) if err != nil { return 0, err } if getResp.Count > 1 { return 0, errUnexpectedNumberOfEntities } val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { return 0, err } replace := getResp.Count > 0 if !replace { return 0, ErrGRPCResourceNotFound } existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0]) if innerErr != nil { return 0, innerErr } // directly return if we have the same entity if metadata.equal(existingVal) { return 0, nil } modRevision := metadata.ModRevision if modRevision == 0 { modRevision = getResp.Kvs[0].ModRevision } txnResp, txnErr := e.client.Txn(ctx). If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)). Then(clientv3.OpPut(key, string(val))). Commit() if txnErr != nil { return 0, txnErr } if !txnResp.Succeeded { return 0, errConcurrentModification } return txnResp.Responses[0].GetResponsePut().Header.Revision, nil } // create will first check existence of the entity with the metadata, // and put the value if it does not exist. // Otherwise, it will return ErrGRPCAlreadyExists. func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) (int64, error) { if !e.closer.AddRunning() { return 0, ErrClosed } defer e.closer.Done() key, err := metadata.key() if err != nil { return 0, err } key = e.prependNamespace(key) getResp, err := e.client.Get(ctx, key) if err != nil { return 0, err } if getResp.Count > 1 { return 0, errUnexpectedNumberOfEntities } val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { return 0, err } replace := getResp.Count > 0 if replace { return 0, ErrGRPCAlreadyExists } putResp, err := e.client.Put(ctx, key, string(val)) if err != nil { s, ok := status.FromError(err) if ok && s.Code() == codes.AlreadyExists { return 0, ErrGRPCAlreadyExists } return 0, err } return putResp.Header.Revision, nil } func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, kind Kind) ([]proto.Message, error) { if !e.closer.AddRunning() { return nil, ErrClosed } defer e.closer.Done() prefix = e.prependNamespace(prefix) resp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } entities := make([]proto.Message, resp.Count) for i := int64(0); i < resp.Count; i++ { md, err := kind.Unmarshal(resp.Kvs[i]) if err != nil { return nil, err } entities[i] = md.Spec.(proto.Message) } return entities, nil } func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (bool, error) { if !e.closer.AddRunning() { return false, ErrClosed } defer e.closer.Done() key, err := metadata.key() if err != nil { return false, err } key = e.prependNamespace(key) resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV()) if err != nil { return false, err } if resp.Deleted == 1 { return true, nil } return false, nil } const leaseDuration = 5 * time.Second func (e *etcdSchemaRegistry) Register(ctx context.Context, metadata Metadata, forced bool) error { if !e.closer.AddRunning() { return ErrClosed } defer e.closer.Done() key, err := e.prepareKey(metadata) if err != nil { return err } val, err := e.prepareValue(metadata) if err != nil { return err } lease, err := e.client.Grant(ctx, int64(leaseDuration.Seconds())) if err != nil { return errors.WithMessagef(err, "failed to grant lease for key %s", key) } if err := e.putKeyVal(ctx, key, val, lease, forced); err != nil { return err } //nolint:contextcheck if err := e.keepLeaseAlive(lease, key, val); err != nil { return fmt.Errorf("failed to keep lease alive for key %s: %w", key, err) } return nil } func (e *etcdSchemaRegistry) prepareKey(metadata Metadata) (string, error) { key, err := metadata.key() if err != nil { return "", err } return e.prependNamespace(key), nil } func (e *etcdSchemaRegistry) prepareValue(metadata Metadata) (string, error) { val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { return "", err } return string(val), nil } func (e *etcdSchemaRegistry) putKeyVal(ctx context.Context, key, val string, lease *clientv3.LeaseGrantResponse, forced bool) error { if forced { if _, err := e.client.Put(ctx, key, val, clientv3.WithLease(lease.ID)); err != nil { return fmt.Errorf("failed to forcefully put key-value pair for key %s: %w", key, err) } } else { ops := []clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision(key), "=", 0)} txn := e.client.Txn(ctx).If(ops...) txn = txn.Then(clientv3.OpPut(key, val, clientv3.WithLease(lease.ID))) txn = txn.Else(clientv3.OpGet(key)) response, err := txn.Commit() if err != nil { return fmt.Errorf("failed to commit transaction for key %s: %w", key, err) } if !response.Succeeded { tr := pb.TxnResponse(*response) return errors.Wrapf(ErrGRPCAlreadyExists, "key %s, response: %s", key, tr.String()) } } return nil } func (e *etcdSchemaRegistry) keepLeaseAlive(lease *clientv3.LeaseGrantResponse, key, val string) error { keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) if err != nil { return fmt.Errorf("failed to keep lease alive for key %s: %w", key, err) } go func() { if !e.closer.AddRunning() { return } defer func() { e.revokeLease(lease) e.closer.Done() }() for { select { case <-e.closer.CloseNotify(): return case keepAliveResp := <-keepAliveChan: if keepAliveResp == nil { keepAliveChan = e.revokeAndReconnectLease(lease, key, val) } } } }() return nil } func (e *etcdSchemaRegistry) revokeAndReconnectLease(lease *clientv3.LeaseGrantResponse, key, val string) <-chan *clientv3.LeaseKeepAliveResponse { for { e.revokeLease(lease) select { case <-e.closer.CloseNotify(): return nil default: lease, err := e.client.Grant(context.Background(), int64(leaseDuration.Seconds())) if err != nil { e.l.Error().Err(err).Msg("failed to grant lease") time.Sleep(leaseDuration) continue } _, err = e.client.Put(context.Background(), key, val, clientv3.WithLease(lease.ID)) if err != nil { e.l.Error().Err(err).Msg("failed to put key-value pair") time.Sleep(leaseDuration) continue } keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) if err != nil { e.l.Error().Err(err).Msg("failed to keep alive") time.Sleep(leaseDuration) } else { return keepAliveChan } } } } func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { if lease == nil { return } ctx, cancel := context.WithTimeout(context.Background(), leaseDuration) defer cancel() _, err := e.client.Lease.Revoke(ctx, lease.ID) if err != nil && e.l.Debug().Enabled() { e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID) } } func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, revision int64, opts ...WatcherOption) *watcher { wc := watcherConfig{ key: e.prependNamespace(kind.key()), kind: kind, revision: revision, checkInterval: 5 * time.Minute, // Default value } for _, opt := range opts { opt(&wc) } return newWatcher(e.client, wc, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String()))) } func listPrefixesForEntity(group, entityPrefix string) string { return path.Join(entityPrefix, group) } func formatKey(entityPrefix string, metadata *commonv1.Metadata) string { return path.Join( listPrefixesForEntity(metadata.GetGroup(), entityPrefix), metadata.GetName()) } func extractTLSConfig(cfg *etcdSchemaRegistryConfig) *tls.Config { if cfg.tlsCAFile == "" && cfg.tlsCertFile == "" && cfg.tlsKeyFile == "" { return nil } tlsInfo := transport.TLSInfo{ TrustedCAFile: cfg.tlsCAFile, CertFile: cfg.tlsCertFile, KeyFile: cfg.tlsKeyFile, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil } return tlsConfig }