pkg/schema/cache.go (496 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"io"
"path"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/cgroups"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
var _ Resource = (*resourceSpec)(nil)
type resourceSpec struct {
schema ResourceSchema
delegated IndexListener
}
func (rs *resourceSpec) Delegated() IndexListener {
return rs.delegated
}
func (rs *resourceSpec) Schema() ResourceSchema {
return rs.schema
}
func (rs *resourceSpec) maxRevision() int64 {
return rs.schema.GetMetadata().GetModRevision()
}
func (rs *resourceSpec) isNewThan(other *resourceSpec) bool {
return other.maxRevision() <= rs.maxRevision()
}
const maxWorkerNum = 8
func getWorkerNum() int {
maxProcs := cgroups.CPUs()
if maxProcs > maxWorkerNum {
return maxWorkerNum
}
return maxProcs
}
var _ Repository = (*schemaRepo)(nil)
type schemaRepo struct {
metadata metadata.Repo
resourceSupplier ResourceSupplier
resourceSchemaSupplier ResourceSchemaSupplier
l *logger.Logger
closer *run.ChannelCloser
eventCh chan MetadataEvent
metrics *Metrics
groupMap sync.Map
resourceMap sync.Map
indexRuleMap sync.Map
bindingForwardMap sync.Map
bindingBackwardMap sync.Map
workerNum int
resourceMutex sync.Mutex
groupMux sync.Mutex
}
func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
if !sr.closer.AddSender() {
return
}
defer sr.closer.SenderDone()
select {
case sr.eventCh <- event:
case <-sr.closer.CloseNotify():
}
}
// StopCh implements Repository.
func (sr *schemaRepo) StopCh() <-chan struct{} {
return sr.closer.CloseNotify()
}
// NewRepository return a new Repository.
func NewRepository(
metadata metadata.Repo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
metrics *Metrics,
) Repository {
workNum := getWorkerNum()
return &schemaRepo{
metadata: metadata,
l: l,
resourceSupplier: resourceSupplier,
resourceSchemaSupplier: resourceSupplier,
eventCh: make(chan MetadataEvent, workNum),
workerNum: workNum,
closer: run.NewChannelCloser(),
metrics: metrics,
}
}
// NewPortableRepository return a new Repository without tsdb.
func NewPortableRepository(
metadata metadata.Repo,
l *logger.Logger,
supplier ResourceSchemaSupplier,
metrics *Metrics,
) Repository {
workNum := getWorkerNum()
return &schemaRepo{
metadata: metadata,
l: l,
resourceSchemaSupplier: supplier,
eventCh: make(chan MetadataEvent, workNum),
workerNum: workNum,
closer: run.NewChannelCloser(),
metrics: metrics,
}
}
func (sr *schemaRepo) Watcher() {
for i := 0; i < sr.workerNum; i++ {
go func() {
if !sr.closer.AddReceiver() {
return
}
defer func() {
sr.closer.ReceiverDone()
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("watching the events")
}
sr.metrics.totalPanics.Inc(1)
}()
for {
select {
case evt, more := <-sr.eventCh:
if !more {
return
}
if e := sr.l.Debug(); e.Enabled() {
e.Interface("event", evt).Msg("received an event")
}
var err error
switch evt.Typ {
case EventAddOrUpdate:
switch evt.Kind {
case EventKindGroup:
_, err = sr.storeGroup(evt.Metadata.GetMetadata())
if errors.As(err, schema.ErrGRPCResourceNotFound) {
err = nil
}
case EventKindResource:
err = sr.storeResource(evt.Metadata)
case EventKindIndexRule:
indexRule := evt.Metadata.(*databasev1.IndexRule)
sr.storeIndexRule(indexRule)
case EventKindIndexRuleBinding:
indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
sr.storeIndexRuleBinding(indexRuleBinding)
}
case EventDelete:
switch evt.Kind {
case EventKindGroup:
err = sr.deleteGroup(evt.Metadata.GetMetadata())
case EventKindResource:
sr.deleteResource(evt.Metadata.GetMetadata())
case EventKindIndexRule:
key := getKey(evt.Metadata.GetMetadata())
sr.indexRuleMap.Delete(key)
case EventKindIndexRuleBinding:
indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding)
col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Subject.GetName(),
Group: indexRuleBinding.GetMetadata().GetGroup(),
}))
if col == nil {
break
}
tMap := col.(*sync.Map)
key := getKey(indexRuleBinding.GetMetadata())
tMap.Delete(key)
for i := range indexRuleBinding.Rules {
col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Rules[i],
Group: indexRuleBinding.GetMetadata().GetGroup(),
}))
if col == nil {
continue
}
tMap := col.(*sync.Map)
tMap.Delete(key)
}
}
}
if err != nil && !errors.Is(err, schema.ErrClosed) {
select {
case <-sr.closer.CloseNotify():
return
default:
}
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
sr.metrics.totalErrs.Inc(1)
go func() {
sr.SendMetadataEvent(evt)
sr.metrics.totalRetries.Inc(1)
}()
}
case <-sr.closer.CloseNotify():
return
}
}
}()
}
}
func (sr *schemaRepo) storeGroup(groupMeta *commonv1.Metadata) (*group, error) {
name := groupMeta.GetName()
sr.groupMux.Lock()
defer sr.groupMux.Unlock()
g, ok := sr.getGroup(name)
if !ok {
sr.l.Info().Str("group", name).Msg("creating a tsdb")
g = sr.createGroup(name)
if err := g.init(name); err != nil {
return nil, err
}
return g, nil
}
if !g.isInit() {
if err := g.init(name); err != nil {
return nil, err
}
return g, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name)
if err != nil {
return nil, err
}
prevGroupSchema := g.GetSchema()
if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision {
return g, nil
}
g.groupSchema.Store(groupSchema)
if proto.Equal(groupSchema, prevGroupSchema) {
return g, nil
}
sr.l.Info().Str("group", name).Msg("updating the group resource options")
g.db.Load().(DB).UpdateOptions(groupSchema.ResourceOpts)
return g, nil
}
func (sr *schemaRepo) createGroup(name string) (g *group) {
g = newGroup(sr.metadata, sr.l, sr.resourceSupplier)
sr.groupMap.Store(name, g)
return
}
func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
name := groupMeta.GetName()
g, loaded := sr.groupMap.LoadAndDelete(name)
if !loaded {
return nil
}
return g.(*group).close()
}
func (sr *schemaRepo) getGroup(name string) (*group, bool) {
g, ok := sr.groupMap.Load(name)
if !ok {
return nil, false
}
return g.(*group), true
}
func (sr *schemaRepo) LoadGroup(name string) (Group, bool) {
g, ok := sr.getGroup(name)
if !ok {
return nil, false
}
return g, g.isInit()
}
func (sr *schemaRepo) LoadAllGroups() []Group {
var groups []Group
sr.groupMap.Range(func(_, value any) bool {
groups = append(groups, value.(*group))
return true
})
return groups
}
func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) {
k := getKey(metadata)
s, ok := sr.resourceMap.Load(k)
if !ok {
return nil, false
}
return s.(Resource), true
}
func (sr *schemaRepo) storeResource(resourceSchema ResourceSchema) error {
sr.resourceMutex.Lock()
defer sr.resourceMutex.Unlock()
resource := &resourceSpec{
schema: resourceSchema,
}
key := getKey(resourceSchema.GetMetadata())
pre, loadedPre := sr.resourceMap.Load(key)
var preResource *resourceSpec
if loadedPre {
preResource = pre.(*resourceSpec)
}
if loadedPre && preResource.isNewThan(resource) {
return nil
}
sm, err := sr.resourceSchemaSupplier.OpenResource(resource)
if err != nil {
return errors.WithMessage(err, "fails to open the resource")
}
sm.OnIndexUpdate(sr.indexRules(resourceSchema))
resource.delegated = sm
sr.resourceMap.Store(key, resource)
return nil
}
func (sr *schemaRepo) storeIndexRule(indexRule *databasev1.IndexRule) {
key := getKey(indexRule.GetMetadata())
if prev, loaded := sr.indexRuleMap.LoadOrStore(key, indexRule); loaded {
if prev.(*databasev1.IndexRule).GetMetadata().ModRevision <= indexRule.GetMetadata().ModRevision {
sr.indexRuleMap.Store(key, indexRule)
if col, _ := sr.bindingBackwardMap.Load(key); col != nil {
col.(*sync.Map).Range(func(_, value any) bool {
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
return true
})
}
}
} else {
if col, _ := sr.bindingBackwardMap.Load(key); col != nil {
col.(*sync.Map).Range(func(_, value any) bool {
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
return true
})
}
}
}
func (sr *schemaRepo) storeIndexRuleBinding(indexRuleBinding *databasev1.IndexRuleBinding) {
var changed bool
col, _ := sr.bindingForwardMap.LoadOrStore(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Subject.GetName(),
Group: indexRuleBinding.GetMetadata().GetGroup(),
}), &sync.Map{})
tMap := col.(*sync.Map)
key := getKey(indexRuleBinding.GetMetadata())
if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); loaded {
if prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= indexRuleBinding.GetMetadata().ModRevision {
tMap.Store(key, indexRuleBinding)
changed = true
}
} else {
changed = true
}
for i := range indexRuleBinding.Rules {
col, _ := sr.bindingBackwardMap.LoadOrStore(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Rules[i],
Group: indexRuleBinding.GetMetadata().GetGroup(),
}), &sync.Map{})
tMap := col.(*sync.Map)
key := getKey(indexRuleBinding.GetMetadata())
if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); loaded {
if prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= indexRuleBinding.GetMetadata().ModRevision {
tMap.Store(key, indexRuleBinding)
changed = true
}
} else {
changed = true
}
}
if !changed {
return
}
sr.updateIndex(indexRuleBinding)
}
func (sr *schemaRepo) updateIndex(binding *databasev1.IndexRuleBinding) {
if r, ok := sr.LoadResource(&commonv1.Metadata{
Name: binding.Subject.GetName(),
Group: binding.GetMetadata().GetGroup(),
}); ok {
r.Delegated().OnIndexUpdate(sr.indexRules(r.Schema()))
}
}
func (sr *schemaRepo) indexRules(schema ResourceSchema) []*databasev1.IndexRule {
n := schema.GetMetadata().GetName()
g := schema.GetMetadata().GetGroup()
col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
Name: n,
Group: g,
}))
if col == nil {
return nil
}
tMap := col.(*sync.Map)
var indexRules []*databasev1.IndexRule
tMap.Range(func(_, value any) bool {
indexRuleBinding := value.(*databasev1.IndexRuleBinding)
for i := range indexRuleBinding.Rules {
if r, _ := sr.indexRuleMap.Load(getKey(&commonv1.Metadata{
Name: indexRuleBinding.Rules[i],
Group: g,
})); r != nil {
indexRules = append(indexRules, r.(*databasev1.IndexRule))
}
}
return true
})
return indexRules
}
func getKey(metadata *commonv1.Metadata) string {
return path.Join(metadata.GetGroup(), metadata.GetName())
}
func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) {
key := getKey(metadata)
_, _ = sr.resourceMap.LoadAndDelete(key)
}
func (sr *schemaRepo) Close() {
defer func() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err", err).Msg("closing resource")
}
}()
sr.closer.CloseThenWait()
close(sr.eventCh)
sr.groupMux.Lock()
defer sr.groupMux.Unlock()
sr.groupMap.Range(func(_, value any) bool {
if value == nil {
return true
}
g, ok := value.(*group)
if !ok {
return true
}
err := g.close()
if err != nil {
sr.l.Err(err).RawJSON("group", logger.Proto(g.GetSchema().Metadata)).Msg("closing")
}
return true
})
sr.groupMap = sync.Map{}
}
var _ Group = (*group)(nil)
type group struct {
resourceSupplier ResourceSupplier
metadata metadata.Repo
db atomic.Value
groupSchema atomic.Pointer[commonv1.Group]
l *logger.Logger
}
func newGroup(
metadata metadata.Repo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
) *group {
g := &group{
groupSchema: atomic.Pointer[commonv1.Group]{},
metadata: metadata,
l: l,
resourceSupplier: resourceSupplier,
}
return g
}
func (g *group) init(name string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name)
if errors.As(err, schema.ErrGRPCResourceNotFound) {
return nil
}
if err != nil {
return err
}
return g.initBySchema(groupSchema)
}
func (g *group) initBySchema(groupSchema *commonv1.Group) error {
g.groupSchema.Store(groupSchema)
if g.isPortable() {
return nil
}
db, err := g.resourceSupplier.OpenDB(groupSchema)
if err != nil {
return err
}
g.db.Store(db)
return err
}
func (g *group) isInit() bool {
return g.GetSchema() != nil
}
func (g *group) GetSchema() *commonv1.Group {
return g.groupSchema.Load()
}
func (g *group) SupplyTSDB() io.Closer {
return g.db.Load().(io.Closer)
}
func (g *group) isPortable() bool {
return g.resourceSupplier == nil
}
func (g *group) close() (err error) {
if !g.isInit() || g.isPortable() {
return nil
}
return multierr.Append(err, g.SupplyTSDB().Close())
}