banyand/liaison/grpc/discovery.go (280 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package grpc import ( "fmt" "sync" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { metadataRepo metadata.Repo nodeRegistry NodeRegistry groupRepo *groupRepo entityRepo *entityRepo shardingKeyRepo *shardingKeyRepo log *logger.Logger kind schema.Kind } func newDiscoveryService(kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)} er := &entityRepo{entitiesMap: make(map[identity]partition.Locator)} sr := &shardingKeyRepo{shardingKeysMap: make(map[identity]partition.Locator)} return &discoveryService{ groupRepo: gr, entityRepo: er, shardingKeyRepo: sr, kind: kind, metadataRepo: metadataRepo, nodeRegistry: nodeRegistry, } } func (ds *discoveryService) initialize() error { ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, ds.groupRepo) ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo) if ds.kind == schema.KindMeasure { ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.shardingKeyRepo) } return nil } func (ds *discoveryService) SetLogger(log *logger.Logger) { ds.log = log ds.groupRepo.log = log ds.entityRepo.log = log ds.shardingKeyRepo.log = log } func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (pbv1.Entity, pbv1.EntityValues, common.ShardID, error) { shardNum, existed := ds.groupRepo.shardNum(metadata.Group) if !existed { return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the shard num by: %v", metadata) } id := getID(metadata) entityLocator, existed := ds.entityRepo.getLocator(id) if !existed { return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the entity locator by: %v", metadata) } entity, entityValues, shardID, err := entityLocator.Locate(metadata.Name, tagFamilies, shardNum) if err != nil { return nil, nil, common.ShardID(0), err } shardingKeyLocator, existed := ds.shardingKeyRepo.getLocator(id) if !existed { return entity, entityValues, shardID, nil } _, _, shardID, err = shardingKeyLocator.Locate(metadata.Name, tagFamilies, shardNum) if err != nil { return nil, nil, common.ShardID(0), err } return entity, entityValues, shardID, nil } type identity struct { name string group string } func (i identity) String() string { return fmt.Sprintf("%s/%s", i.group, i.name) } var _ schema.EventHandler = (*groupRepo)(nil) type groupRepo struct { schema.UnimplementedOnInitHandler log *logger.Logger resourceOpts map[string]*commonv1.ResourceOpts sync.RWMutex } func (s *groupRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } group := schemaMetadata.Spec.(*commonv1.Group) if group.ResourceOpts == nil || group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return } if le := s.log.Debug(); le.Enabled() { le.Stringer("id", group.Metadata).Uint32("total", group.ResourceOpts.ShardNum).Msg("shard added or updated") } s.RWMutex.Lock() defer s.RWMutex.Unlock() s.resourceOpts[group.Metadata.GetName()] = group.ResourceOpts } func (s *groupRepo) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } group := schemaMetadata.Spec.(*commonv1.Group) if group.ResourceOpts == nil || group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return } if le := s.log.Debug(); le.Enabled() { le.Stringer("id", group.Metadata).Msg("shard deleted") } s.RWMutex.Lock() defer s.RWMutex.Unlock() delete(s.resourceOpts, group.Metadata.GetName()) } func (s *groupRepo) shardNum(groupName string) (uint32, bool) { s.RWMutex.RLock() defer s.RWMutex.RUnlock() r, ok := s.resourceOpts[groupName] if !ok { return 0, false } return r.ShardNum, true } func getID(metadata *commonv1.Metadata) identity { return identity{ name: metadata.GetName(), group: metadata.GetGroup(), } } var _ schema.EventHandler = (*entityRepo)(nil) type entityRepo struct { schema.UnimplementedOnInitHandler log *logger.Logger entitiesMap map[identity]partition.Locator sync.RWMutex } // OnAddOrUpdate implements schema.EventHandler. func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { var l partition.Locator var id identity var modRevision int64 switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) modRevision = measure.GetMetadata().GetModRevision() l = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) id = getID(measure.GetMetadata()) case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) modRevision = stream.GetMetadata().GetModRevision() l = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) id = getID(stream.GetMetadata()) default: return } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { case schema.KindMeasure: kind = "measure" case schema.KindStream: kind = "stream" default: kind = "unknown" } le. Str("action", "add_or_update"). Stringer("subject", id). Str("kind", kind). Msg("entity added or updated") } e.RWMutex.Lock() defer e.RWMutex.Unlock() e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, ModRevision: modRevision} } // OnDelete implements schema.EventHandler. func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { var id identity switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) id = getID(measure.GetMetadata()) case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) id = getID(stream.GetMetadata()) default: return } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { case schema.KindMeasure: kind = "measure" case schema.KindStream: kind = "stream" default: kind = "unknown" } le. Str("action", "delete"). Stringer("subject", id). Str("kind", kind). Msg("entity deleted") } e.RWMutex.Lock() defer e.RWMutex.Unlock() delete(e.entitiesMap, id) } func (e *entityRepo) getLocator(id identity) (partition.Locator, bool) { e.RWMutex.RLock() defer e.RWMutex.RUnlock() el, ok := e.entitiesMap[id] if !ok { return partition.Locator{}, false } return el, true } var _ schema.EventHandler = (*shardingKeyRepo)(nil) type shardingKeyRepo struct { schema.UnimplementedOnInitHandler log *logger.Logger shardingKeysMap map[identity]partition.Locator sync.RWMutex } // OnAddOrUpdate implements schema.EventHandler. func (s *shardingKeyRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindMeasure { return } measure := schemaMetadata.Spec.(*databasev1.Measure) shardingKey := measure.GetShardingKey() if shardingKey == nil || len(shardingKey.GetTagNames()) == 0 { return } l := partition.NewShardingKeyLocator(measure.TagFamilies, measure.ShardingKey) id := getID(measure.GetMetadata()) if le := s.log.Debug(); le.Enabled() { le. Str("action", "add_or_update"). Stringer("subject", id). Str("kind", "measure"). Msg("sharding key added or updated") } s.RWMutex.Lock() defer s.RWMutex.Unlock() s.shardingKeysMap[id] = partition.Locator{TagLocators: l.TagLocators} } // OnDelete implements schema.EventHandler. func (s *shardingKeyRepo) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindMeasure { return } measure := schemaMetadata.Spec.(*databasev1.Measure) shardingKey := measure.GetShardingKey() if shardingKey == nil || len(shardingKey.GetTagNames()) == 0 { return } id := getID(measure.GetMetadata()) if le := s.log.Debug(); le.Enabled() { le. Str("action", "delete"). Stringer("subject", id). Str("kind", "measure"). Msg("sharding key deleted") } s.RWMutex.Lock() defer s.RWMutex.Unlock() delete(s.shardingKeysMap, id) } func (s *shardingKeyRepo) getLocator(id identity) (partition.Locator, bool) { s.RWMutex.RLock() defer s.RWMutex.RUnlock() sl, ok := s.shardingKeysMap[id] if !ok { return partition.Locator{}, false } return sl, true }