banyand/metadata/client.go (302 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package metadata
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
const (
// DefaultNamespace is the default namespace of the metadata stored in etcd.
DefaultNamespace = "banyandb"
// FlagEtcdEndpointsName is the default flag name for etcd endpoints.
FlagEtcdEndpointsName = "etcd-endpoints"
)
const flagEtcdUsername = "etcd-username"
const flagEtcdPassword = "etcd-password"
const flagEtcdTLSCAFile = "etcd-tls-ca-file"
const flagEtcdTLSCertFile = "etcd-tls-cert-file"
const flagEtcdTLSKeyFile = "etcd-tls-key-file"
// NewClient returns a new metadata client.
func NewClient(toRegisterNode, forceRegisterNode bool) (Service, error) {
return &clientService{
closer: run.NewCloser(1),
forceRegisterNode: forceRegisterNode,
toRegisterNode: toRegisterNode,
}, nil
}
type clientService struct {
schemaRegistry schema.Registry
closer *run.Closer
nodeInfo *databasev1.Node
etcdTLSCertFile string
etcdPassword string
etcdTLSCAFile string
etcdUsername string
etcdTLSKeyFile string
namespace string
endpoints []string
registryTimeout time.Duration
etcdFullSyncInterval time.Duration
nodeInfoMux sync.Mutex
forceRegisterNode bool
toRegisterNode bool
}
func (s *clientService) SchemaRegistry() schema.Registry {
return s.schemaRegistry
}
func (s *clientService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The namespace of the metadata stored in etcd")
fs.StringSliceVar(&s.endpoints, FlagEtcdEndpointsName, []string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
fs.StringVar(&s.etcdUsername, flagEtcdUsername, "", "A username of etcd")
fs.StringVar(&s.etcdPassword, flagEtcdPassword, "", "A password of etcd user")
fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted certificate authority")
fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client certificate")
fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key for the etcd client certificate.")
fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry")
fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 30*time.Minute, "The interval for full sync etcd")
return fs
}
func (s *clientService) Validate() error {
if s.endpoints == nil {
return errors.New("endpoints is empty")
}
return nil
}
func (s *clientService) PreRun(ctx context.Context) error {
stopCh := make(chan struct{})
sn := make(chan os.Signal, 1)
l := logger.GetLogger(s.Name())
signal.Notify(sn,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
go func() {
select {
case si := <-sn:
logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si)
close(stopCh)
case <-s.closer.CloseNotify():
close(stopCh)
}
}()
for {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
schema.ConfigureWatchCheckInterval(s.etcdFullSyncInterval),
)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
select {
case <-stopCh:
return errors.New("pre-run interrupted")
case <-time.After(s.registryTimeout):
return errors.New("pre-run timeout")
case <-s.closer.CloseNotify():
return errors.New("pre-run interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
break
}
return err
}
if !s.toRegisterNode {
return nil
}
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
}
node := val.(common.Node)
val = ctx.Value(common.ContextNodeRolesKey)
if val == nil {
return errors.New("node roles is empty")
}
nodeRoles := val.([]databasev1.Role)
nodeInfo := &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
},
GrpcAddress: node.GrpcAddress,
HttpAddress: node.HTTPAddress,
Roles: nodeRoles,
Labels: node.Labels,
CreatedAt: timestamppb.Now(),
}
for {
ctxCancelable, cancel := context.WithTimeout(ctx, time.Second*10)
err := s.schemaRegistry.RegisterNode(ctxCancelable, nodeInfo, s.forceRegisterNode)
cancel()
if errors.Is(err, schema.ErrGRPCAlreadyExists) ||
errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Log the specific error
l.Warn().Err(err).Strs("etcd-endpoints", s.endpoints).Msg("register node error")
select {
case <-stopCh:
return errors.New("register node interrupted")
case <-time.After(s.registryTimeout):
return errors.New("register node timeout")
case <-s.closer.CloseNotify():
return errors.New("register node interrupted")
default:
time.Sleep(time.Second)
continue
}
}
if err == nil {
l.Info().Stringer("info", nodeInfo).Msg("register node successfully")
s.nodeInfoMux.Lock()
s.nodeInfo = nodeInfo
s.nodeInfoMux.Unlock()
}
return err
}
}
func (s *clientService) Serve() run.StopNotify {
if s.schemaRegistry != nil {
s.schemaRegistry.StartWatcher()
}
return s.closer.CloseNotify()
}
func (s *clientService) GracefulStop() {
s.closer.Done()
s.closer.CloseThenWait()
if s.schemaRegistry != nil {
if err := s.schemaRegistry.Close(); err != nil {
logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close schema registry")
}
}
}
func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) {
s.schemaRegistry.RegisterHandler(name, kind, handler)
}
func (s *clientService) StreamRegistry() schema.Stream {
return s.schemaRegistry
}
func (s *clientService) IndexRuleRegistry() schema.IndexRule {
return s.schemaRegistry
}
func (s *clientService) IndexRuleBindingRegistry() schema.IndexRuleBinding {
return s.schemaRegistry
}
func (s *clientService) MeasureRegistry() schema.Measure {
return s.schemaRegistry
}
func (s *clientService) GroupRegistry() schema.Group {
return s.schemaRegistry
}
func (s *clientService) TopNAggregationRegistry() schema.TopNAggregation {
return s.schemaRegistry
}
func (s *clientService) NodeRegistry() schema.Node {
return s.schemaRegistry
}
func (s *clientService) PropertyRegistry() schema.Property {
return s.schemaRegistry
}
func (s *clientService) Name() string {
return "metadata"
}
func (s *clientService) Role() databasev1.Role {
return databasev1.Role_ROLE_META
}
func (s *clientService) IndexRules(ctx context.Context, subject *commonv1.Metadata) ([]*databasev1.IndexRule, error) {
bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: subject.Group})
if err != nil {
return nil, err
}
now := time.Now()
foundRules := make([]string, 0)
for _, binding := range bindings {
if binding.GetBeginAt().AsTime().After(now) ||
binding.GetExpireAt().AsTime().Before(now) {
continue
}
sub := binding.GetSubject()
if sub.Name != subject.Name {
continue
}
foundRules = append(foundRules, binding.Rules...)
}
result := make([]*databasev1.IndexRule, 0, len(foundRules))
var indexRuleErr error
for _, rule := range foundRules {
r, getErr := s.schemaRegistry.GetIndexRule(ctx, &commonv1.Metadata{
Name: rule,
Group: subject.Group,
})
if getErr != nil {
indexRuleErr = multierr.Append(indexRuleErr, err)
continue
}
result = append(result, r)
}
return result, indexRuleErr
}
func (s *clientService) Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) {
bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()})
if err != nil {
return nil, err
}
now := time.Now()
var subjectErr error
foundSubjects := make([]schema.Spec, 0)
for _, binding := range bindings {
if binding.GetBeginAt().AsTime().After(now) ||
binding.GetExpireAt().AsTime().Before(now) {
continue
}
sub := binding.GetSubject()
if sub.GetCatalog() != catalog {
continue
}
if !contains(binding.GetRules(), indexRule.GetMetadata().GetName()) {
continue
}
switch catalog {
case commonv1.Catalog_CATALOG_STREAM:
stream, getErr := s.schemaRegistry.GetStream(ctx, &commonv1.Metadata{
Name: sub.GetName(),
Group: indexRule.GetMetadata().GetGroup(),
})
if getErr != nil {
subjectErr = multierr.Append(subjectErr, getErr)
}
foundSubjects = append(foundSubjects, stream)
case commonv1.Catalog_CATALOG_MEASURE:
measure, getErr := s.schemaRegistry.GetMeasure(ctx, &commonv1.Metadata{
Name: sub.GetName(),
Group: indexRule.GetMetadata().GetGroup(),
})
if getErr != nil {
subjectErr = multierr.Append(subjectErr, getErr)
}
foundSubjects = append(foundSubjects, measure)
default:
continue
}
}
return foundSubjects, subjectErr
}
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}