banyand/liaison/grpc/discovery.go (272 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package grpc import ( "context" "fmt" "sync" "time" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" ) var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { pipeline queue.Queue metadataRepo metadata.Repo shardRepo *shardRepo entityRepo *entityRepo log *logger.Logger kind schema.Kind } func newDiscoveryService(pipeline queue.Queue, kind schema.Kind, metadataRepo metadata.Repo) *discoveryService { sr := &shardRepo{shardEventsMap: make(map[identity]uint32)} er := &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)} return &discoveryService{ shardRepo: sr, entityRepo: er, pipeline: pipeline, kind: kind, metadataRepo: metadataRepo, } } func (ds *discoveryService) initialize() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx) cancel() if err != nil { return err } for _, g := range groups { switch ds.kind { case schema.KindMeasure: case schema.KindStream: default: continue } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr } for _, s := range shards { ds.shardRepo.OnAddOrUpdate(schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindShard, Name: s.Metadata.Name, Group: s.Metadata.Group, }, Spec: s, }) } switch ds.kind { case schema.KindMeasure: ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr } for _, m := range mm { ds.entityRepo.OnAddOrUpdate(schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindMeasure, Name: m.Metadata.Name, Group: m.Metadata.Group, }, Spec: m, }) } case schema.KindStream: ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr } for _, s := range ss { ds.entityRepo.OnAddOrUpdate(schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindStream, Name: s.Metadata.Name, Group: s.Metadata.Group, }, Spec: s, }) } default: return fmt.Errorf("unsupported kind: %d", ds.kind) } } ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo) ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo) return nil } func (ds *discoveryService) SetLogger(log *logger.Logger) { ds.log = log ds.shardRepo.log = log ds.entityRepo.log = log } func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) { shardNum, existed := ds.shardRepo.shardNum(getID(&commonv1.Metadata{ Name: metadata.Group, })) if !existed { return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the shard num by: %v", metadata) } locator, existed := ds.entityRepo.getLocator(getID(metadata)) if !existed { return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the locator by: %v", metadata) } return locator.Locate(metadata.Name, tagFamilies, shardNum) } type identity struct { name string group string } func (i identity) String() string { return fmt.Sprintf("%s/%s", i.group, i.name) } var _ schema.EventHandler = (*shardRepo)(nil) type shardRepo struct { log *logger.Logger shardEventsMap map[identity]uint32 sync.RWMutex } // OnAddOrUpdate implements schema.EventHandler. func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindShard { return } shard := schemaMetadata.Spec.(*databasev1.Shard) idx := getID(shard.GetMetadata()) if le := s.log.Debug(); le.Enabled() { le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard added or updated") } s.RWMutex.Lock() defer s.RWMutex.Unlock() s.shardEventsMap[idx] = shard.Total } // OnDelete implements schema.EventHandler. func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindShard { return } shard := schemaMetadata.Spec.(*databasev1.Shard) idx := getID(shard.GetMetadata()) if le := s.log.Debug(); le.Enabled() { le.Stringer("id", idx).Msg("shard deleted") } s.RWMutex.Lock() defer s.RWMutex.Unlock() delete(s.shardEventsMap, idx) } func (s *shardRepo) shardNum(idx identity) (uint32, bool) { s.RWMutex.RLock() defer s.RWMutex.RUnlock() sn, ok := s.shardEventsMap[idx] if !ok { return 0, false } return sn, true } func getID(metadata *commonv1.Metadata) identity { return identity{ name: metadata.GetName(), group: metadata.GetGroup(), } } var _ schema.EventHandler = (*entityRepo)(nil) type entityRepo struct { log *logger.Logger entitiesMap map[identity]partition.EntityLocator sync.RWMutex } // OnAddOrUpdate implements schema.EventHandler. func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { var el partition.EntityLocator var id identity switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) el = partition.NewEntityLocator(measure.TagFamilies, measure.Entity) id = getID(measure.GetMetadata()) case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) el = partition.NewEntityLocator(stream.TagFamilies, stream.Entity) id = getID(stream.GetMetadata()) default: return } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { case schema.KindMeasure: kind = "measure" case schema.KindStream: kind = "stream" default: kind = "unknown" } le. Str("action", "add_or_update"). Stringer("subject", id). Str("kind", kind). Msg("entity added or updated") } en := make(partition.EntityLocator, 0, len(el)) for _, l := range el { en = append(en, partition.TagLocator{ FamilyOffset: l.FamilyOffset, TagOffset: l.TagOffset, }) } e.RWMutex.Lock() defer e.RWMutex.Unlock() e.entitiesMap[id] = en } // OnDelete implements schema.EventHandler. func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { var id identity switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) id = getID(measure.GetMetadata()) case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) id = getID(stream.GetMetadata()) default: return } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { case schema.KindMeasure: kind = "measure" case schema.KindStream: kind = "stream" default: kind = "unknown" } le. Str("action", "delete"). Stringer("subject", id). Str("kind", kind). Msg("entity deleted") } e.RWMutex.Lock() defer e.RWMutex.Unlock() delete(e.entitiesMap, id) } func (e *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) { e.RWMutex.RLock() defer e.RWMutex.RUnlock() el, ok := e.entitiesMap[id] if !ok { return nil, false } return el, true }