banyand/stream/metadata.go (325 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"context"
"fmt"
"io"
"path"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/api/validate"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var metadataScope = streamScope.SubScope("metadata")
// SchemaService allows querying schema information.
type SchemaService interface {
Query
Close()
}
type schemaRepo struct {
resourceSchema.Repository
l *logger.Logger
metadata metadata.Repo
path string
}
func newSchemaRepo(path string, svc *service, nodeLabels map[string]string) schemaRepo {
sr := schemaRepo{
l: svc.l,
path: path,
metadata: svc.metadata,
Repository: resourceSchema.NewRepository(
svc.metadata,
svc.l,
newSupplier(path, svc, nodeLabels),
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
sr.start()
return sr
}
// NewPortableRepository creates a new portable repository.
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
Repository: resourceSchema.NewPortableRepository(
metadata,
l,
newPortableSupplier(metadata, l),
metrics,
),
}
r.start()
return r
}
func (sr *schemaRepo) start() {
sr.Watcher()
sr.metadata.
RegisterHandler("stream", schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
sr)
}
func (sr *schemaRepo) Stream(metadata *commonv1.Metadata) (Stream, error) {
sm, ok := sr.loadStream(metadata)
if !ok {
return nil, errors.WithStack(ErrStreamNotExist)
}
return sm, nil
}
func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRange {
g, ok := sr.LoadGroup(group)
if !ok {
return nil
}
db := g.SupplyTSDB()
if db == nil {
return nil
}
return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange()
}
func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) {
if len(kinds) != 4 {
logger.Panicf("invalid kinds: %v", kinds)
return false, nil
}
_, revs := sr.Repository.Init(schema.KindStream)
return true, revs
}
func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
return
}
if err := validate.GroupForStreamOrMeasure(g); err != nil {
sr.l.Warn().Err(err).Msg("group is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindGroup,
Metadata: g,
})
case schema.KindStream:
if err := validate.Stream(metadata.Spec.(*databasev1.Stream)); err != nil {
sr.l.Warn().Err(err).Msg("stream is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.Stream),
})
case schema.KindIndexRuleBinding:
if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
if err := validate.IndexRuleBinding(irb); err != nil {
sr.l.Warn().Err(err).Msg("index rule binding is ignored")
return
}
if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindIndexRuleBinding,
Metadata: irb,
})
}
}
case schema.KindIndexRule:
if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok {
if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
sr.l.Warn().Err(err).Msg("index rule is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindIndexRule,
Metadata: ir,
})
}
default:
}
}
func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindGroup,
Metadata: g,
})
case schema.KindStream:
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.Stream),
})
case schema.KindIndexRuleBinding:
if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
if binding.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindIndexRuleBinding,
Metadata: metadata.Spec.(*databasev1.IndexRuleBinding),
})
}
}
case schema.KindIndexRule:
if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindIndexRule,
Metadata: rule,
})
}
default:
}
}
func (sr *schemaRepo) loadStream(metadata *commonv1.Metadata) (*stream, bool) {
r, ok := sr.LoadResource(metadata)
if !ok {
return nil, false
}
s, ok := r.Delegated().(*stream)
return s, ok
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option], error) {
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
return nil, fmt.Errorf("tsdb for group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
type supplier struct {
metadata metadata.Repo
pipeline queue.Queue
omr observability.MetricsRegistry
l *logger.Logger
pm *protector.Memory
schemaRepo *schemaRepo
nodeLabels map[string]string
path string
option option
}
func newSupplier(path string, svc *service, nodeLabels map[string]string) *supplier {
return &supplier{
path: path,
metadata: svc.metadata,
l: svc.l,
pipeline: svc.localPipeline,
option: svc.option,
omr: svc.omr,
pm: svc.pm,
schemaRepo: &svc.schemaRepo,
nodeLabels: nodeLabels,
}
}
func (s *supplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) {
streamSchema := spec.Schema().(*databasev1.Stream)
return openStream(streamSpec{
schema: streamSchema,
}, s.l, s.pm, s.schemaRepo), nil
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.metadata.StreamRegistry().GetStream(ctx, md)
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) {
name := groupSchema.Metadata.Name
p := common.Position{
Module: "stream",
Database: name,
}
ro := groupSchema.ResourceOpts
if ro == nil {
return nil, fmt.Errorf("no resource opts in group %s", name)
}
shardNum := ro.ShardNum
ttl := ro.Ttl
segInterval := ro.SegmentInterval
segmentIdleTimeout := time.Duration(0)
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
var ttlNum uint32
for _, st := range ro.Stages {
if st.Ttl.Unit != ro.Ttl.Unit {
return nil, fmt.Errorf("ttl unit %s is not consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
}
selector, err := pub.ParseLabelSelector(st.NodeSelector)
if err != nil {
return nil, errors.WithMessagef(err, "failed to parse node selector %s", st.NodeSelector)
}
ttlNum += st.Ttl.Num
if !selector.Matches(s.nodeLabels) {
continue
}
ttl.Num += ttlNum
shardNum = st.ShardNum
segInterval = st.SegmentInterval
if st.Close {
segmentIdleTimeout = 5 * time.Minute
}
break
}
}
opts := storage.TSDBOpts[*tsTable, option]{
ShardNum: shardNum,
Location: path.Join(s.path, groupSchema.Metadata.Name),
TSTableCreator: newTSTable,
TableMetrics: s.newMetrics(p),
SegmentInterval: storage.MustToIntervalRule(segInterval),
TTL: storage.MustToIntervalRule(ttl),
Option: s.option,
SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second),
SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize),
StorageMetricsFactory: s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), p.DBLabelValues()))),
SegmentIdleTimeout: segmentIdleTimeout,
}
return storage.OpenTSDB(
common.SetPosition(context.Background(), func(_ common.Position) common.Position {
return p
}),
opts)
}
type portableSupplier struct {
metadata metadata.Repo
l *logger.Logger
}
func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) *portableSupplier {
return &portableSupplier{
metadata: metadata,
l: l,
}
}
func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.metadata.StreamRegistry().GetStream(ctx, md)
}
func (*portableSupplier) OpenDB(_ *commonv1.Group) (io.Closer, error) {
panic("do not support open db")
}
func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) {
streamSchema := spec.Schema().(*databasev1.Stream)
return openStream(streamSpec{
schema: streamSchema,
}, s.l, nil, nil), nil
}