banyand/stream/metadata.go (197 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "path" "time" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1/tsdb" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) type schemaRepo struct { resourceSchema.Repository l *logger.Logger metadata metadata.Repo } func newSchemaRepo(path string, metadata metadata.Repo, bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger, ) schemaRepo { return schemaRepo{ l: l, metadata: metadata, Repository: resourceSchema.NewRepository( metadata, l, newSupplier(path, metadata, bufferSize, dbOpts, l), ), } } func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) { switch m.Kind { case schema.KindGroup: g := m.Spec.(*commonv1.Group) if g.Catalog != commonv1.Catalog_CATALOG_STREAM { return } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, Kind: resourceSchema.EventKindGroup, Metadata: g.GetMetadata(), }) case schema.KindStream: sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, Kind: resourceSchema.EventKindResource, Metadata: m.Spec.(*databasev1.Stream).GetMetadata(), }) case schema.KindIndexRuleBinding: irb, ok := m.Spec.(*databasev1.IndexRuleBinding) if !ok { sr.l.Warn().Msg("fail to convert message to IndexRuleBinding") return } if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) stm, err := sr.metadata.StreamRegistry().GetStream(ctx, &commonv1.Metadata{ Name: irb.GetSubject().GetName(), Group: m.Group, }) cancel() if err != nil { sr.l.Error().Err(err).Msg("fail to get subject") return } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, Kind: resourceSchema.EventKindResource, Metadata: stm.GetMetadata(), }) } case schema.KindIndexRule: ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) subjects, err := sr.metadata.Subjects(ctx, m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM) cancel() if err != nil { sr.l.Error().Err(err).Msg("fail to get subjects(stream)") return } for _, sub := range subjects { sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, Kind: resourceSchema.EventKindResource, Metadata: sub.(*databasev1.Stream).GetMetadata(), }) } default: } } func (sr *schemaRepo) OnDelete(m schema.Metadata) { switch m.Kind { case schema.KindGroup: g := m.Spec.(*commonv1.Group) if g.Catalog != commonv1.Catalog_CATALOG_STREAM { return } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventDelete, Kind: resourceSchema.EventKindGroup, Metadata: g.GetMetadata(), }) case schema.KindStream: sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventDelete, Kind: resourceSchema.EventKindResource, Metadata: m.Spec.(*databasev1.Stream).GetMetadata(), }) case schema.KindIndexRuleBinding: if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) stm, err := sr.metadata.StreamRegistry().GetStream(ctx, &commonv1.Metadata{ Name: m.Name, Group: m.Group, }) cancel() if err != nil { sr.l.Error().Err(err).Msg("fail to get subject") return } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventDelete, Kind: resourceSchema.EventKindResource, Metadata: stm.GetMetadata(), }) } case schema.KindIndexRule: default: } } func (sr *schemaRepo) loadStream(metadata *commonv1.Metadata) (*stream, bool) { r, ok := sr.LoadResource(metadata) if !ok { return nil, false } s, ok := r.(*stream) return s, ok } var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { metadata metadata.Repo l *logger.Logger path string dbOpts tsdb.DatabaseOpts bufferSize int64 } func newSupplier(path string, metadata metadata.Repo, bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier { return &supplier{ path: path, bufferSize: bufferSize, dbOpts: dbOpts, metadata: metadata, l: l, } } func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.ResourceSpec) (resourceSchema.Resource, error) { streamSchema := spec.Schema.(*databasev1.Stream) return openStream(shardNum, db, streamSpec{ schema: streamSchema, indexRules: spec.IndexRules, }, s.l), nil } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return s.metadata.StreamRegistry().GetStream(ctx, md) } func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { name := groupSchema.Metadata.Name opts := s.dbOpts opts.ShardNum = groupSchema.ResourceOpts.ShardNum opts.Location = path.Join(s.path, groupSchema.Metadata.Name) opts.TSTableFactory = &tsTableFactory{ bufferSize: s.bufferSize, compressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, chunkSize: chunkSize, } var err error if opts.BlockInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.BlockInterval); err != nil { return nil, err } if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil { return nil, err } if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil { return nil, err } return tsdb.OpenDatabase( common.SetPosition(context.Background(), func(p common.Position) common.Position { p.Module = "stream" p.Database = name return p }), opts) }