pkg/schema/metadata.go (395 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package schema implements a framework to sync schema info from the metadata repository.
package schema
import (
"context"
"io"
"math"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
// EventType defines actions of events.
type EventType uint8
// EventType support Add/Update and Delete.
// All events are idempotent.
const (
EventAddOrUpdate EventType = iota
EventDelete
)
// EventKind defines category of events.
type EventKind uint8
// This framework groups events to a hierarchy. A group is the root node.
const (
EventKindGroup EventKind = iota
EventKindResource
)
// Group is the root node, allowing get resources from its sub nodes.
type Group interface {
GetSchema() *commonv1.Group
StoreResource(resourceSchema ResourceSchema) (Resource, error)
LoadResource(name string) (Resource, bool)
}
// MetadataEvent is the syncing message between metadata repo and this framework.
type MetadataEvent struct {
Metadata *commonv1.Metadata
Typ EventType
Kind EventKind
}
// ResourceSchema allows get the metadata.
type ResourceSchema interface {
GetMetadata() *commonv1.Metadata
}
// ResourceSpec wraps required fields to open a resource.
type ResourceSpec struct {
Schema ResourceSchema
// IndexRules are index rules bound to the Schema
IndexRules []*databasev1.IndexRule
// Aggregations are topN aggregation bound to the Schema, e.g. TopNAggregation
Aggregations []*databasev1.TopNAggregation
}
// Resource allows access metadata from a local cache.
type Resource interface {
GetIndexRules() []*databasev1.IndexRule
GetTopN() []*databasev1.TopNAggregation
MaxObservedModRevision() int64
ResourceSchema
io.Closer
}
// ResourceSupplier allows open a resource and its embedded tsdb.
type ResourceSupplier interface {
OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) (Resource, error)
ResourceSchema(metdata *commonv1.Metadata) (ResourceSchema, error)
OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
}
// Repository is the collection of several hierarchies groups by a "Group".
type Repository interface {
Watcher()
SendMetadataEvent(MetadataEvent)
StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
LoadGroup(name string) (Group, bool)
LoadResource(metadata *commonv1.Metadata) (Resource, bool)
Close()
StopCh() <-chan struct{}
}
const defaultWorkerNum = 10
var _ Repository = (*schemaRepo)(nil)
type schemaRepo struct {
metadata metadata.Repo
resourceSupplier ResourceSupplier
l *logger.Logger
data map[string]*group
workerCloser *run.Closer
closer *run.Closer
eventCh chan MetadataEvent
workerNum int
sync.RWMutex
}
// StopCh implements Repository.
func (sr *schemaRepo) StopCh() <-chan struct{} {
return sr.closer.CloseNotify()
}
// NewRepository return a new Repository.
func NewRepository(
metadata metadata.Repo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
) Repository {
return &schemaRepo{
metadata: metadata,
l: l,
resourceSupplier: resourceSupplier,
data: make(map[string]*group),
eventCh: make(chan MetadataEvent, defaultWorkerNum),
workerNum: defaultWorkerNum,
workerCloser: run.NewCloser(defaultWorkerNum),
closer: run.NewCloser(1),
}
}
func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
sr.eventCh <- event
}
func (sr *schemaRepo) Watcher() {
for i := 0; i < sr.workerNum; i++ {
go func() {
defer func() {
sr.workerCloser.Done()
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("watching the events")
}
}()
for {
select {
case evt, more := <-sr.eventCh:
if !more {
return
}
if e := sr.l.Debug(); e.Enabled() {
e.Interface("event", evt).Msg("received an event")
}
var err error
switch evt.Typ {
case EventAddOrUpdate:
switch evt.Kind {
case EventKindGroup:
_, err = sr.StoreGroup(evt.Metadata)
case EventKindResource:
_, err = sr.storeResource(evt.Metadata)
}
case EventDelete:
switch evt.Kind {
case EventKindGroup:
err = sr.deleteGroup(evt.Metadata)
case EventKindResource:
err = sr.deleteResource(evt.Metadata)
}
}
if err != nil && !errors.Is(err, schema.ErrClosed) {
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
select {
case sr.eventCh <- evt:
case <-sr.workerCloser.CloseNotify():
return
}
}
case <-sr.workerCloser.CloseNotify():
return
}
}
}()
}
}
func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
groupSchema, err := sr.metadata.GroupRegistry().GetGroup(ctx, groupMeta.GetName())
if err != nil {
return nil, err
}
name := groupSchema.GetMetadata().GetName()
sr.Lock()
defer sr.Unlock()
g, ok := sr.getGroup(name)
if !ok {
sr.l.Info().Str("group", name).Msg("creating a tsdb")
var db tsdb.Database
db, err = sr.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return nil, err
}
g = newGroup(groupSchema, sr.metadata, db, sr.l, sr.resourceSupplier)
sr.data[name] = g
return g, nil
}
prevGroupSchema := g.GetSchema()
if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision {
return g, nil
}
sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
db := g.SupplyTSDB()
db.Close()
sr.l.Info().Str("group", name).Msg("creating a new tsdb")
newDB, err := sr.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return nil, err
}
g.setDB(newDB)
g.groupSchema.Store(groupSchema)
return g, nil
}
func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
name := groupMeta.GetName()
sr.Lock()
defer sr.Unlock()
var ok bool
g, ok := sr.getGroup(name)
if !ok {
return nil
}
err := g.close()
if err != nil {
return err
}
delete(sr.data, name)
return nil
}
func (sr *schemaRepo) getGroup(name string) (*group, bool) {
g := sr.data[name]
if g == nil {
return nil, false
}
return g, true
}
func (sr *schemaRepo) LoadGroup(name string) (Group, bool) {
sr.RLock()
defer sr.RUnlock()
return sr.getGroup(name)
}
func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) {
g, ok := sr.LoadGroup(metadata.Group)
if !ok {
return nil, false
}
return g.LoadResource(metadata.Name)
}
func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, error) {
group, ok := sr.LoadGroup(metadata.Group)
if !ok {
var err error
if group, err = sr.StoreGroup(&commonv1.Metadata{Name: metadata.Group}); err != nil {
return nil, errors.WithMessagef(err, "create unknown group:%s", metadata.Group)
}
}
stm, err := sr.resourceSupplier.ResourceSchema(metadata)
if err != nil {
return nil, errors.WithMessage(err, "fails to get the resource")
}
return group.StoreResource(stm)
}
func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error {
g, ok := sr.LoadGroup(metadata.Group)
if !ok {
return nil
}
return g.(*group).deleteResource(metadata)
}
func (sr *schemaRepo) Close() {
defer func() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("closing resource")
}
}()
sr.workerCloser.CloseThenWait()
sr.RLock()
defer sr.RUnlock()
for _, g := range sr.data {
err := g.close()
if err != nil {
sr.l.Err(err).RawJSON("group", logger.Proto(g.GetSchema().Metadata)).Msg("closing")
}
}
sr.closer.Done()
sr.closer.CloseThenWait()
}
var _ Group = (*group)(nil)
type group struct {
resourceSupplier ResourceSupplier
metadata metadata.Repo
db atomic.Value
groupSchema atomic.Pointer[commonv1.Group]
l *logger.Logger
schemaMap map[string]Resource
mapMutex sync.RWMutex
}
func newGroup(
groupSchema *commonv1.Group,
metadata metadata.Repo,
db tsdb.Database,
l *logger.Logger,
resourceSupplier ResourceSupplier,
) *group {
g := &group{
groupSchema: atomic.Pointer[commonv1.Group]{},
metadata: metadata,
l: l,
schemaMap: make(map[string]Resource),
resourceSupplier: resourceSupplier,
}
g.groupSchema.Store(groupSchema)
g.db.Store(db)
return g
}
func (g *group) GetSchema() *commonv1.Group {
return g.groupSchema.Load()
}
func (g *group) SupplyTSDB() tsdb.Database {
return g.db.Load().(tsdb.Database)
}
func (g *group) setDB(db tsdb.Database) {
g.db.Store(db)
}
func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
g.mapMutex.Lock()
defer g.mapMutex.Unlock()
key := resourceSchema.GetMetadata().GetName()
preResource := g.schemaMap[key]
if preResource != nil &&
resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() {
// we only need to check the max modifications revision observed for index rules
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
cancel()
if errIndexRules != nil {
return nil, errIndexRules
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
cancel()
if errTopN != nil {
return nil, errTopN
}
if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
if preResource.MaxObservedModRevision() >= maxModRevision {
return preResource, nil
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
idxRules, err := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata())
if err != nil {
return nil, err
}
var topNAggrs []*databasev1.TopNAggregation
if _, ok := resourceSchema.(*databasev1.Measure); ok {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
var innerErr error
topNAggrs, innerErr = g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
cancel()
if innerErr != nil {
return nil, innerErr
}
}
sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, ResourceSpec{
Schema: resourceSchema,
IndexRules: idxRules,
Aggregations: topNAggrs,
})
if errTS != nil {
return nil, errTS
}
g.schemaMap[key] = sm
if preResource != nil {
_ = preResource.Close()
}
return sm, nil
}
func (g *group) deleteResource(metadata *commonv1.Metadata) error {
g.mapMutex.Lock()
defer g.mapMutex.Unlock()
key := metadata.GetName()
preResource := g.schemaMap[key]
if preResource == nil {
return nil
}
delete(g.schemaMap, key)
_ = preResource.Close()
return nil
}
func (g *group) LoadResource(name string) (Resource, bool) {
g.mapMutex.RLock()
s := g.schemaMap[name]
g.mapMutex.RUnlock()
if s == nil {
return nil, false
}
return s, true
}
func (g *group) close() (err error) {
g.mapMutex.RLock()
for _, s := range g.schemaMap {
err = multierr.Append(err, s.Close())
}
g.mapMutex.RUnlock()
return multierr.Append(err, g.SupplyTSDB().Close())
}
// ParseMaxModRevision gives the max revision from resources' metadata.
func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) {
maxRevisionForIdxRules = int64(0)
for _, idxRule := range indexRules {
if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
}
}
return
}