banyand/measure/metadata.go (308 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"context"
"fmt"
"path"
"time"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"google.golang.org/protobuf/testing/protocmp"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1/tsdb"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
type schemaRepo struct {
resourceSchema.Repository
l *logger.Logger
metadata metadata.Repo
}
func newSchemaRepo(path string, metadata metadata.Repo,
dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue, encoderBufferSize, bufferSize int64,
) schemaRepo {
return schemaRepo{
l: l,
metadata: metadata,
Repository: resourceSchema.NewRepository(
metadata,
l,
newSupplier(path, metadata, dbOpts, l, pipeline, encoderBufferSize, bufferSize),
),
}
}
func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindGroup,
Metadata: g.GetMetadata(),
})
case schema.KindMeasure:
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.Measure).GetMetadata(),
})
case schema.KindIndexRuleBinding:
irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding)
if !ok {
sr.l.Warn().Msg("fail to convert message to IndexRuleBinding")
return
}
if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stm, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
Name: irb.GetSubject().GetName(),
Group: metadata.Group,
})
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: stm.GetMetadata(),
})
}
case schema.KindIndexRule:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subjects(measure)")
return
}
for _, sub := range subjects {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: sub.(*databasev1.Measure).GetMetadata(),
})
}
case schema.KindTopNAggregation:
// createOrUpdate TopN schemas in advance
_, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
if err != nil {
sr.l.Error().Err(err).Msg("fail to create/update topN measure")
return
}
// reload source measure
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure(),
})
default:
}
}
func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) {
oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata())
if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
return nil, err
}
sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure())
if err != nil {
return nil, err
}
tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
for _, tagName := range tagNames {
var found bool
for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
for _, tSpec := range fSpec.GetTags() {
if tSpec.GetName() == tagName {
seriesSpecs = append(seriesSpecs, tSpec)
found = true
goto CHECK
}
}
}
CHECK:
if !found {
return nil, fmt.Errorf("fail to find tag spec %s", tagName)
}
}
// create a new "derived" measure for TopN result
newTopNMeasure := &databasev1.Measure{
Metadata: topNSchema.GetMetadata(),
Interval: sourceMeasureSchema.GetInterval(),
TagFamilies: []*databasev1.TagFamilySpec{
{
Name: TopNTagFamily,
Tags: append([]*databasev1.TagSpec{
{
Name: "measure_id",
Type: databasev1.TagType_TAG_TYPE_STRING,
},
}, seriesSpecs...),
},
},
Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
}
if oldTopNSchema == nil {
if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil {
return nil, innerErr
}
return newTopNMeasure, nil
}
// compare with the old one
if cmp.Diff(newTopNMeasure, oldTopNSchema,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
protocmp.Transform()) == "" {
return oldTopNSchema, nil
}
// update
if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil {
return nil, err
}
return newTopNMeasure, nil
}
func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindGroup,
Metadata: g.GetMetadata(),
})
case schema.KindMeasure:
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.Measure).GetMetadata(),
})
case schema.KindIndexRuleBinding:
if metadata.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
m, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
Name: metadata.Name,
Group: metadata.Group,
})
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
// we should update instead of delete
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: m.GetMetadata(),
})
}
case schema.KindIndexRule:
case schema.KindTopNAggregation:
err := sr.removeTopNMeasure(metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure())
if err != nil {
sr.l.Error().Err(err).Msg("fail to remove topN measure")
return
}
// we should update instead of delete
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure(),
})
default:
}
}
func (sr *schemaRepo) removeTopNMeasure(metadata *commonv1.Metadata) error {
_, err := sr.metadata.MeasureRegistry().DeleteMeasure(context.Background(), metadata)
return err
}
func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) {
r, ok := sr.LoadResource(metadata)
if !ok {
return nil, false
}
s, ok := r.(*measure)
return s, ok
}
var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
type supplier struct {
metadata metadata.Repo
pipeline queue.Queue
l *logger.Logger
path string
dbOpts tsdb.DatabaseOpts
encoderBufferSize, bufferSize int64
}
func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger,
pipeline queue.Queue, encoderBufferSize, bufferSize int64,
) *supplier {
return &supplier{
path: path,
dbOpts: dbOpts,
metadata: metadata,
l: l,
pipeline: pipeline,
encoderBufferSize: encoderBufferSize,
bufferSize: bufferSize,
}
}
func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.ResourceSpec) (resourceSchema.Resource, error) {
measureSchema := spec.Schema.(*databasev1.Measure)
return openMeasure(shardNum, db, measureSpec{
schema: measureSchema,
indexRules: spec.IndexRules,
topNAggregations: spec.Aggregations,
}, s.l, s.pipeline)
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
opts := s.dbOpts
opts.ShardNum = groupSchema.ResourceOpts.ShardNum
opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
name := groupSchema.Metadata.Name
opts.TSTableFactory = &tsTableFactory{
bufferSize: s.bufferSize,
encoderBufferSize: s.encoderBufferSize,
encoderPool: encoding.NewEncoderPool(name, intChunkNum, intervalFn),
decoderPool: encoding.NewDecoderPool(name, intChunkNum, intervalFn),
compressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
encodingChunkSize: intChunkSize,
plainChunkSize: plainChunkSize,
}
var err error
if opts.BlockInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.BlockInterval); err != nil {
return nil, err
}
if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil {
return nil, err
}
if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil {
return nil, err
}
return tsdb.OpenDatabase(
common.SetPosition(context.Background(), func(p common.Position) common.Position {
p.Module = "measure"
p.Database = name
return p
}),
opts)
}
func intervalFn(key []byte) time.Duration {
_, interval, err := pbv1.DecodeFieldFlag(key)
if err != nil {
panic(err)
}
return interval
}