banyand/measure/metadata.go (429 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"context"
"fmt"
"io"
"path"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/api/validate"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/logger"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const (
// TopNSchemaName is the name of the top n result schema.
TopNSchemaName = "_top_n_result"
// TopNTagFamily is the tag family name of the topN result measure.
TopNTagFamily = "_topN"
// TopNFieldName is the field name of the topN result measure.
TopNFieldName = "value"
)
var (
metadataScope = measureScope.SubScope("metadata")
topNFieldsSpec = []*databasev1.FieldSpec{{
Name: TopNFieldName,
FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY,
EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
}}
// TopNTagNames is the tag names of the topN result measure.
TopNTagNames = []string{"name", "direction", "group"}
)
// SchemaService allows querying schema information.
type SchemaService interface {
Query
Close()
}
type schemaRepo struct {
resourceSchema.Repository
metadata metadata.Repo
pipeline queue.Queue
l *logger.Logger
topNProcessorMap sync.Map
path string
}
func newSchemaRepo(path string, svc *service, nodeLabels map[string]string) *schemaRepo {
sr := &schemaRepo{
path: path,
l: svc.l,
metadata: svc.metadata,
pipeline: svc.localPipeline,
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
svc.l,
newSupplier(path, svc, sr, nodeLabels),
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
)
sr.start()
return sr
}
// NewPortableRepository creates a new portable repository.
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
Repository: resourceSchema.NewPortableRepository(
metadata,
l,
newPortableSupplier(metadata, l),
metrics,
),
}
r.start()
return r
}
func (sr *schemaRepo) start() {
sr.Watcher()
sr.metadata.
RegisterHandler("measure", schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
sr)
}
func (sr *schemaRepo) Measure(metadata *commonv1.Metadata) (Measure, error) {
sm, ok := sr.loadMeasure(metadata)
if !ok {
return nil, errors.WithStack(ErrMeasureNotExist)
}
return sm, nil
}
func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRange {
g, ok := sr.LoadGroup(group)
if !ok {
return nil
}
db := g.SupplyTSDB()
if db == nil {
return nil
}
return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange()
}
func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) {
if len(kinds) != 5 {
logger.Panicf("unexpected kinds: %v", kinds)
return false, nil
}
groupNames, revs := sr.Repository.Init(schema.KindMeasure)
for i := range groupNames {
sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), groupNames[i])
}
return true, revs
}
func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
if err := validate.GroupForStreamOrMeasure(g); err != nil {
sr.l.Warn().Err(err).Msg("group is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindGroup,
Metadata: g,
})
sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), g.Metadata.Name)
case schema.KindMeasure:
m := metadata.Spec.(*databasev1.Measure)
if err := validate.Measure(m); err != nil {
sr.l.Warn().Err(err).Msg("measure is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: m,
})
case schema.KindIndexRuleBinding:
if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
if err := validate.IndexRuleBinding(irb); err != nil {
sr.l.Warn().Err(err).Msg("index rule binding is ignored")
return
}
if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindIndexRuleBinding,
Metadata: irb,
})
}
}
case schema.KindIndexRule:
if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok {
if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
sr.l.Warn().Err(err).Msg("index rule is ignored")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindIndexRule,
Metadata: ir,
})
}
case schema.KindTopNAggregation:
topNSchema := metadata.Spec.(*databasev1.TopNAggregation)
if err := validate.TopNAggregation(topNSchema); err != nil {
sr.l.Warn().Err(err).Msg("topNAggregation is ignored")
return
}
manager := sr.getSteamingManager(topNSchema.SourceMeasure, sr.pipeline)
manager.register(topNSchema)
default:
}
}
func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindGroup,
Metadata: g,
})
case schema.KindMeasure:
m := metadata.Spec.(*databasev1.Measure)
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindResource,
Metadata: m,
})
sr.stopSteamingManager(m.GetMetadata())
case schema.KindIndexRuleBinding:
if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
if binding.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindIndexRuleBinding,
Metadata: metadata.Spec.(*databasev1.IndexRuleBinding),
})
}
}
case schema.KindIndexRule:
if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindIndexRule,
Metadata: rule,
})
}
case schema.KindTopNAggregation:
topNAggregation := metadata.Spec.(*databasev1.TopNAggregation)
sr.stopSteamingManager(topNAggregation.SourceMeasure)
default:
}
}
func (sr *schemaRepo) Close() {
var err error
sr.topNProcessorMap.Range(func(_, val any) bool {
manager := val.(*topNProcessorManager)
err = multierr.Append(err, manager.Close())
return true
})
if err != nil {
sr.l.Error().Err(err).Msg("faced error when closing schema repository")
}
sr.Repository.Close()
}
func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) {
r, ok := sr.LoadResource(metadata)
if !ok {
return nil, false
}
s, ok := r.Delegated().(*measure)
return s, ok
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option], error) {
if sr == nil {
return nil, fmt.Errorf("schemaRepo is nil")
}
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
return nil, fmt.Errorf("group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, group string) {
md := GetTopNSchemaMetadata(group)
operation := func() error {
m, err := measureSchemaRegistry.GetMeasure(ctx, md)
if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
return errors.WithMessagef(err, "fail to get %s", md)
}
if m != nil {
return nil
}
m = GetTopNSchema(md)
if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr != nil {
if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
return errors.WithMessagef(innerErr, "fail to create new topN measure %s", m)
}
}
return nil
}
backoffStrategy := backoff.NewExponentialBackOff()
backoffStrategy.MaxElapsedTime = 0 // never stop until topN measure has been created
err := backoff.Retry(operation, backoffStrategy)
if err != nil {
logger.Panicf("fail to create topN measure %s: %v", md, err)
}
}
var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
type supplier struct {
metadata metadata.Repo
omr observability.MetricsRegistry
l *logger.Logger
pm *protector.Memory
schemaRepo *schemaRepo
nodeLabels map[string]string
path string
option option
}
func newSupplier(path string, svc *service, sr *schemaRepo, nodeLabels map[string]string) *supplier {
return &supplier{
path: path,
metadata: svc.metadata,
l: svc.l,
option: svc.option,
omr: svc.omr,
pm: svc.pm,
schemaRepo: sr,
nodeLabels: nodeLabels,
}
}
func (s *supplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) {
measureSchema := spec.Schema().(*databasev1.Measure)
return openMeasure(measureSpec{
schema: measureSchema,
}, s.l, s.pm, s.schemaRepo)
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) {
name := groupSchema.Metadata.Name
p := common.Position{
Module: "measure",
Database: name,
}
metrics, factory := s.newMetrics(p)
ro := groupSchema.ResourceOpts
if ro == nil {
return nil, fmt.Errorf("no resource opts in group %s", name)
}
shardNum := ro.ShardNum
ttl := ro.Ttl
segInterval := ro.SegmentInterval
segmentIdleTimeout := time.Duration(0)
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
var ttlNum uint32
for _, st := range ro.Stages {
if st.Ttl.Unit != ro.Ttl.Unit {
return nil, fmt.Errorf("ttl unit %s is not consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
}
selector, err := pub.ParseLabelSelector(st.NodeSelector)
if err != nil {
return nil, errors.WithMessagef(err, "failed to parse node selector %s", st.NodeSelector)
}
ttlNum += st.Ttl.Num
if !selector.Matches(s.nodeLabels) {
continue
}
ttl.Num += ttlNum
shardNum = st.ShardNum
segInterval = st.SegmentInterval
if st.Close {
segmentIdleTimeout = 5 * time.Minute
}
break
}
}
opts := storage.TSDBOpts[*tsTable, option]{
ShardNum: shardNum,
Location: path.Join(s.path, groupSchema.Metadata.Name),
TSTableCreator: newTSTable,
TableMetrics: metrics,
SegmentInterval: storage.MustToIntervalRule(segInterval),
TTL: storage.MustToIntervalRule(ttl),
Option: s.option,
SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second),
SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize),
StorageMetricsFactory: factory,
SegmentIdleTimeout: segmentIdleTimeout,
}
return storage.OpenTSDB(
common.SetPosition(context.Background(), func(_ common.Position) common.Position {
return p
}),
opts)
}
type portableSupplier struct {
metadata metadata.Repo
l *logger.Logger
}
func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) *portableSupplier {
return &portableSupplier{
metadata: metadata,
l: l,
}
}
func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
}
func (*portableSupplier) OpenDB(_ *commonv1.Group) (io.Closer, error) {
panic("do not support open db")
}
func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) {
measureSchema := spec.Schema().(*databasev1.Measure)
return openMeasure(measureSpec{
schema: measureSchema,
}, s.l, nil, nil)
}
// GetTopNSchema returns the schema of the topN result measure.
func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure {
return &databasev1.Measure{
Metadata: md,
TagFamilies: []*databasev1.TagFamilySpec{
{
Name: TopNTagFamily,
Tags: []*databasev1.TagSpec{
{Name: TopNTagNames[0], Type: databasev1.TagType_TAG_TYPE_STRING},
{Name: TopNTagNames[1], Type: databasev1.TagType_TAG_TYPE_INT},
{Name: TopNTagNames[2], Type: databasev1.TagType_TAG_TYPE_STRING},
},
},
},
Fields: topNFieldsSpec,
Entity: &databasev1.Entity{
TagNames: TopNTagNames,
},
}
}
// GetTopNSchemaMetadata returns the metadata of the topN result measure.
func GetTopNSchemaMetadata(group string) *commonv1.Metadata {
return &commonv1.Metadata{
Name: TopNSchemaName,
Group: group,
}
}
func getKey(metadata *commonv1.Metadata) string {
return path.Join(metadata.GetGroup(), metadata.GetName())
}